Analyzing Twitter Data using Datasift, MongoDB and Pig

If you followed our recent postings on the updated Oracle Information Management Reference Architecture, one of the key concepts we talk about is the “data reservoir”. This is a pool of additional data that you can add to your data warehouse, typically stored on Hadoop or NoSQL databases, where you store unstructured, semi-structured or unprocessed structured data in large volume and at low cost. Adding a data reservoir gives you the ability to leverage the types of data sources that previously were thought of as too “messy”, too high-volume to store for any serious amount of time, or require processing or storing by tools that aren’t in the usual relational data warehouse toolset.

NewImage

By formally including them in your overall information management architecture though, with common tools, security and data governance over the entire dataset, you give your users the ability to consider the whole “360-degree view” of their customers and their interactions with the market.

To take an example, a few weeks ago I posted a series of articles on the blog where I captured user activity on our website, http://www.rittmanmead.com, transported it to one of our Hadoop clusters using Apache Flume, and then analysed it using Hive, Pig and finally Spark. In one of the articles I used Pig and a geocoding API to determine the country that each website visitor came from, and then in a final five-part series I automated the whole process using ODI12c and then copied the final output tables to Oracle using Oracle Loader for Hadoop. This is quite a nice example of ETL-offloading into Hadoop, with an element of Hadoop-native event capture using Flume, but once the processing has finished the data moves out of Hadoop and into the Oracle database.

NewImage

What would be interesting though would be to start adding data into Hadoop that’s permanent, not transitory as part of an ETL process, to start building out this concept of the “data reservoir”. Taking our website activity dataset, something that would really add context to the visits to our site would be corresponding activity on social networks, to see who’s linking to our posts, who’s discussing them, whether those discussions are positive or negative, and which wider networks those people belong to. Twitter is a good place to start with this as it’s the place we see our articles and activities most discussed, but it’s be good to build out this picture over time to add in activity on social networks such as Facebook, Youtube, LinkedIn and Google+; if we did this, we’d be able to consider a much broader and richer picture when looking at activity around Rittman Mead, potentially correlating activity and visits to our website with mentions of us in the press, comments made by our team and the wider picture of what’s going on in our world.

NewImage

There are a number of ways you can bring Twitter data into your Hadoop cluster or data warehouse, but the most convenient way we’ve found is to use DataSift, a social media aggregation site and service that license raw feeds from the likes of Twitter, Facebook, Wordpress and other social media platforms, enhancing the data feeds with sentiment scores and other attributes, and then sell access to the feeds via a number of formats and APIs. Accessing Twitter data through DataSift costs money, particularly if you want to go back and look at historical activity vs. just filtering on a few keywords in new Twitter activity, but they’re very developer-friendly and able to provide greater volumes of firehose activity than the standard Twitter developer API allows.

So assuming you can get access to a stream of Twitter data on a particular topic - in our case, all mentions of our website, our team’s Twitter handles, retweets of our content etc - the question then becomes one of how to store the data. Looking at the Datasift Sample Output page, each of these streams delivers their payloads via JSON documents, XML-like structures that nest categories of tweet metadata within parent structures that make up the total tweet data and metadata dataset.

NewImage

And there’s a good reason for this; individual tweets might not use every bit of possible tweet metadata, for example not including entries under “mentions” or “retweets” if those aren’t used in a  particular message. Certain bits of metadata might be repeated X numbers of times - @ mentions, for example, and the JSON document might have a different structure altogether if a different JSON schema version is used for a particular tweet. Altogether not an easy type of data structure for a relational database to hold - though Oracle 12.1.0.2.0 has just introduced native JSON support to the core Oracle database - but NoSQL databases in contrast find these sorts of data structures easy, and one of the most popular for this type of work is MongoDB.

MongoDB is a open-source “document” database that’s probably best known to the Oracle world through this internet cartoon; what the video is getting at is NoSQL advocates recommending databases such as MongoDB for large-scale web work when something much more mainstream like mySQL would do the job better, but where NoSQL and document-style database come into their own is storing just this type of semi-structured, schema-on-read datasets. In fact, Datasift support MongoDB as an API end-point for their Twitter feed, so let’s go ahead and set up a MongoDB database, prepare it for the Twitter data, and then set-up a Datasift feed into it.

MongoDB installation on Linux, for example to run alongside a Hadoop installation, is pretty straightforward and involves adding a YUM repository and then running “sudo yum install mongodb-org” (there’s an OS X installation too, but I wanted to run this server-side on my Hadoop cluster). Once you’ve installed the MongoDB software, you start the mongod service to enable the server element, and then log into the mongo command-shell to create a new database.

MongoDB, being a schema-on-read database, doesn’t require you to set up a database schema up-front; instead, the schema comes from the data you load into it, with MongoDB’s equivalent of tables called “collections”, and with those collections made up of documents, analogous to rows in Oracle. Where it gets interesting though is that collections and databases only get created when you first start using them, and individual documents can have slightly, or even completely, different schema structures to each other - which makes them ideal for holding the sorts of datasets generated by Twitter, Facebook and DataSift.

[root@cdh51-node1 ~]# mongo
MongoDB shell version: 2.6.4
connecting to: test
> use datasift2
switched to db datasift2

Let’s create a couple of simple documents, and then add those to a collection. Note that the document becomes available just by declaring it, as does the collection when I add documents to it. Note also that the query language we’re using to work with MongoDB is Javascript, again making it particularly suited to JSON documents, and web-type environments.

> a = { name : "mark" }
{ "name" : "mark" }
> b = { product : "chair", size : "L" }
{ "product" : "chair", "size" : "L" }
> db
datasift2
> db.testData.insert(a)
WriteResult({ "nInserted" : 1 })
> db.testData.insert(b)
WriteResult({ "nInserted" : 1 })
> db.testData.find()
{ "_id" : ObjectId("54094081b5b6021fe9bc8b10"), "name" : "mark" }
{ "_id" : ObjectId("54094088b5b6021fe9bc8b11"), "product" : "chair", "size" : "L" }

And note also how the second entry (document) in the collection has a different schema to the entry above it - perfect for our semi-structured Twitter data, and something we could store as-is in MongoDB in this loose data format and then apply more formal structures and schemas to when we come to access the data - as we’ll do in a moment using Pig, and more formally using ODI and Hive in the next article in this series.

Setting up the Twitter feed from DataSift is a two-stage process, once you’ve got an account with them and an API key; first you define your search terms against a nested document model for the data source, then you activate the feed, in this case into my MongoDB database, and wait for the tweets to roll in. For my feed I selected tweets written by myself and some of the Rittman Mead team, tweets mentioning us, and tweets that included links to our blog in the main tweet contents (there’s also a graphical query designer, but I prefer to write them by hand using what DataSift call their “curated stream definition language” (CSDL).

NewImage

You can then preview the feed, live, or go back and sample historic data if you’re interested in loading old tweets, rather than incoming new ones. Once you’re ready you then need to activate the feed, in my case by calling a URL using CURL with a bunch of parameters (our API key and other sensitive data has been masked):

curl -X POST 'https://api.datasift.com/v1/push/create' \
-d 'name=connectormongodb' \
-d 'hash=65bd9dc4943ec426b04819exxxxxxxxx' \
-d 'output_type=mongodb' \
-d 'output_params.host=rittmandev.com' \
-d 'output_params.port=27017' \
-d 'output_params.use_ssl=no' \
-d 'output_params.verify_ssl=no' \
-d 'output_params.db_name=datasiftmongodb' \
-d 'output_params.collection_name=rm_tweets' \
-H 'Auth: rittmanmead:xxxxxxxxxxxxxxxxxxxxxxxxxxx'

The “hash” in the parameter list is the specific feed to activate, and the output type is MongoDB. The collection name is new, and will be created by MongoDB when the first tweet comes in; let’s run the curl command now and sit back for a while, and wait for some twitter activity to arrive in MongoDB …

… and a couple of hours later, eight tweets have been captured by the DataSift filter, with the last of them being one from Michael Rainey about his trip tonight to the Seahawks game:

> db.rm_tweets.count()
8
> db.rm_tweets.findOne()
{
    "_id" : ObjectId("54089a879ad4ec99158b4d78"),
    "interactionId" : "1e43454b1a16a880e074e49c51369eac",
    "subscriptionId" : "f6cf211e03dca5da384786676c31fd3e",
    "hash" : "65bd9dc4943ec426b04819e6291ef1ce",
    "hashType" : "stream",
    "interaction" : {
        "demographic" : {
            "gender" : "male"
        },
        "interaction" : {
            "author" : {
                "avatar" : "http://pbs.twimg.com/profile_images/476898781821018113/YRkKyGDl_normal.jpeg",
                "id" : 14551637,
                "language" : "en",
                "link" : "https://twitter.com/mRainey",
                "name" : "Michael Rainey",
                "username" : "mRainey"
            },
            "content" : "Greyson and I will be ready for the @Seahawks game tonight! #GoHawks! #kickoff2014 #GBvsSEA http://t.co/4u16ziBhnD",
            "created_at" : "Thu, 04 Sep 2014 16:58:29 +0000",
            "hashtags" : [
                "GoHawks",
                "kickoff2014",
                "GBvsSEA"
            ],
            "id" : "1e43454b1a16a880e074e49c51369eac",
            "link" : "https://twitter.com/mRainey/status/507573423334100992",
            "mention_ids" : [
                23642374
            ],
            "mentions" : [
                "Seahawks"
            ],
            "received_at" : 1409849909.2967,
            "schema" : {
                "version" : 3
            },
            "source" : "Instagram",
            "type" : "twitter"
        },
        "language" : {
            "tag" : "en",
                "tag_extended" : "en",
            "confidence" : 98
        },
        "links" : {
            "code" : [
                200
            ],
            "created_at" : [
                "Thu, 04 Sep 2014 16:58:29 +0000"
            ],
            "meta" : {
                "charset" : [
                    "CP1252"
                ],
                "lang" : [
                    "en"
                ],
                "opengraph" : [
                    {
                        "description" : "mrainey's photo on Instagram",
                        "image" : "http://photos-d.ak.instagram.com/hphotos-ak-xfa1/10655141_1470641446544147_1761180844_n.jpg",
                        "site_name" : "Instagram",
                        "type" : "instapp:photo",
                        "url" : "http://instagram.com/p/sh_h6sQBYT/"
                    }
                ]
            },
            "normalized_url" : [
                "http://instagram.com/p/sh_h6sQBYT"
            ],
            "title" : [
                "Instagram"
            ],
            "url" : [
                "http://instagram.com/p/sh_h6sQBYT/"
            ]
        },
        "salience" : {
            "content" : {
                "sentiment" : 0,
                "topics" : [
                    {
                        "name" : "Video Games",
                        "hits" : 0,
                        "score" : 0.5354745388031,
                        "additional" : "Greyson and I will be ready for the @Seahawks game tonight!"
                    }
                ]
            }
        },
        "trends" : {
            "type" : [
                "San Jose",
                "United States"
            ],
            "content" : [
                "seahawks"
            ],
            "source" : [
                "twitter"
            ]
        },
        "twitter" : {
            "created_at" : "Thu, 04 Sep 2014 16:58:29 +0000",
            "display_urls" : [
                "instagram.com/p/sh_h6sQBYT/"
            ],
            "domains" : [
                "instagram.com"
            ],
            "filter_level" : "medium",
            "hashtags" : [
                "GoHawks",
                "kickoff2014",
                "GBvsSEA"
            ],
            "id" : "507573423334100992",
            "lang" : "en",
            "links" : [
                "http://instagram.com/p/sh_h6sQBYT/"
            ],
            "mention_ids" : [
                23642374
            ],
            "mentions" : [
                "Seahawks"
            ],
            "source" : "<a href=\"http://instagram.com\" rel=\"nofollow\">Instagram</a>",
            "text" : "Greyson and I will be ready for the @Seahawks game tonight! #GoHawks! #kickoff2014 #GBvsSEA http://t.co/4u16ziBhnD",
            "user" : {
                "created_at" : "Sat, 26 Apr 2008 21:18:01 +0000",
                "description" : "Data Integration (#ODI #GoldenGate #OBIA) consultant / blogger / speaker @RittmanMead.\nOracle ACE.\n#cycling #Seahawks #travel w/ @XiomaraRainey\n#GoCougs!",
                "favourites_count" : 746,
                "followers_count" : 486,
                "friends_count" : 349,
                "geo_enabled" : true,
                "id" : 14551637,
                "id_str" : "14551637",
                "lang" : "en",
                "listed_count" : 28,
                "location" : "Pasco, WA",
                "name" : "Michael Rainey",
                "profile_image_url" : "http://pbs.twimg.com/profile_images/476898781821018113/YRkKyGDl_normal.jpeg",
                "profile_image_url_https" : "https://pbs.twimg.com/profile_images/476898781821018113/YRkKyGDl_normal.jpeg",
                "screen_name" : "mRainey",
                "statuses_count" : 8549,
                "time_zone" : "Pacific Time (US & Canada)",
                "url" : "http://www.linkedin.com/in/rainey",
                "utc_offset" : -25200,
                "verified" : false
            }
        }
    }
}

If you’ve not looked at Twitter metadata before, it’s surprising how much metadata accompanies what’s ostensibly an 140-character tweet. As well as details on the author, where the tweet was sent from, what Twitter client sent the tweet and details of the tweet itself, there’s details and statistics on the sender, the number of followers they’ve got and where they’re located, a list of all other Twitter users mentioned in the tweet and any URLs and images referenced.

Not every tweet will use every element of metadata, and some tweets will repeat certain attributes - other Twitter users you’ve mentioned in the tweet, for example - as many times as there are mentions. Which makes Twitter data a prime candidate for analysis using Pig and Spark, which handle easily the concept of nested data structures, tuples (ordered lists of data, such as attribute sets for an entity such as “author”), and bags (sets of unordered attributes, such as the list of @ mentions in a tweet).

There’s a MongoDB connector for Hadoop on Github which allows MapReduce to connect to MongoDB databases, running MapReduce jobs on MongoDB storage rather than HDFS (or S3, or whatever). This gives us the ability to use languages such as Pig and Hive to filter and aggregate our MongoDB data rather than MongoDB’s Javascript API, which isn’t as fully-featured and scaleable as MapReduce and has limitations in terms of the number of documents you can include in aggregations; let’s start then by connecting Pig to our MongoDB database, and reading in the documents with no Pig schema applied:

grunt> tweets = LOAD 'mongodb://cdh51-node1:27017/datasiftmongodb.rm_tweets' using com.mongodb.hadoop.pig.MongoLoader;                                                                                                                                                  2014-09-05 06:40:51,773 [main] INFO  com.mongodb.hadoop.pig.MongoStorage - Initializing MongoLoader in dynamic schema mode.                                                                                                                                        
2014-09-05 06:40:51,838 [main] INFO  com.mongodb.hadoop.pig.MongoStorage - Initializing MongoLoader in dynamic schema mode.
grunt> tweets_count = FOREACH (GROUP tweets ALL) GENERATE COUNT (tweets);                                             
2014-09-05 06:41:07,772 [main] INFO  com.mongodb.hadoop.pig.MongoStorage - Initializing MongoLoader in dynamic schema mode.
2014-09-05 06:41:07,817 [main] INFO  com.mongodb.hadoop.pig.MongoStorage - Initializing MongoLoader in dynamic schema mode.
grunt> dump tweets_count
...
(9)
grunt>

So there’s nine tweets in the MongoDB database now. Let’s take a look at one of the documents by creating a Pig alias containing just a single record.

grunt> tweets_limit_1 = LIMIT tweets 1;
2014-09-05 06:43:12,351 [main] INFO  com.mongodb.hadoop.pig.MongoStorage - Initializing MongoLoader in dynamic schema mode.
2014-09-05 06:43:12,443 [main] INFO  com.mongodb.hadoop.pig.MongoStorage - Initializing MongoLoader in dynamic schema mode.
grunt> dump tweets_limit_1
...
([interaction#{trends={source=(twitter), content=(seahawks), type=(San Jose,United States)}, twitter={filter_level=medium, text=Greyson and I will be ready for the @Seahawks game tonight! #GoHawks! #kickoff2014 #GBvsSEA http://t.co/4u16ziBhnD, mention_ids=(23642374), domains=(instagram.com), links=(http://instagram.com/p/sh_h6sQBYT/), lang=en, id=507573423334100992, source=<a href="http://instagram.com" rel="nofollow">Instagram</a>, created_at=Thu, 04 Sep 2014 16:58:29 +0000, hashtags=(GoHawks,kickoff2014,GBvsSEA), mentions=(Seahawks), user={profile_image_url_https=https://pbs.twimg.com/profile_images/476898781821018113/YRkKyGDl_normal.jpeg, location=Pasco, WA, geo_enabled=true, statuses_count=8549, lang=en, url=http://www.linkedin.com/in/rainey, utc_offset=-25200, id=14551637, time_zone=Pacific Time (US & Canada), favourites_count=746, verified=false, friends_count=349, description=Data Integration (#ODI #GoldenGate #OBIA) consultant / blogger / speaker @RittmanMead.
Oracle ACE.
#cycling #Seahawks #travel w/ @XiomaraRainey
#GoCougs!, name=Michael Rainey, created_at=Sat, 26 Apr 2008 21:18:01 +0000, screen_name=mRainey, id_str=14551637, profile_image_url=http://pbs.twimg.com/profile_images/476898781821018113/YRkKyGDl_normal.jpeg, followers_count=486, listed_count=28}, display_urls=(instagram.com/p/sh_h6sQBYT/)}, salience={content={topics=([score#0.5354745388031,additional#Greyson and I will be ready for the @Seahawks game tonight!,hits#0,name#Video Games]), sentiment=0}}, links={created_at=(Thu, 04 Sep 2014 16:58:29 +0000), title=(Instagram), code=(200), normalized_url=(http://instagram.com/p/sh_h6sQBYT), url=(http://instagram.com/p/sh_h6sQBYT/), meta={lang=(en), charset=(CP1252), opengraph=([image#http://photos-d.ak.instagram.com/hphotos-ak-xfa1/10655141_1470641446544147_1761180844_n.jpg,type#instapp:photo,site_name#Instagram,url#http://instagram.com/p/sh_h6sQBYT/,description#mrainey's photo on Instagram])}}, interaction={schema={version=3}, id=1e43454b1a16a880e074e49c51369eac, content=Greyson and I will be ready for the @Seahawks game tonight! #GoHawks! #kickoff2014 #GBvsSEA http://t.co/4u16ziBhnD, author={id=14551637, username=mRainey, language=en, avatar=http://pbs.twimg.com/profile_images/476898781821018113/YRkKyGDl_normal.jpeg, name=Michael Rainey, link=https://twitter.com/mRainey}, received_at=1.4098499092967E9, source=Instagram, mention_ids=(23642374), link=https://twitter.com/mRainey/status/507573423334100992, created_at=Thu, 04 Sep 2014 16:58:29 +0000, hashtags=(GoHawks,kickoff2014,GBvsSEA), type=twitter, mentions=(Seahawks)}, language={tag=en, confidence=98, tag_extended=en}, demographic={gender=male}},interactionId#1e43454b1a16a880e074e49c51369eac,_id#54089a879ad4ec99158b4d78,hash#65bd9dc4943ec426b04819e6291ef1ce,subscriptionId#f6cf211e03dca5da384786676c31fd3e,hashType#stream])

And there’s Michael’s tweet again, with all the attributes from the MongoDB JSON document appended together into a single record. But in this format the data isn’t all that useful as we can’t easily access individual elements in the Twitter record; what would be better would be to apply a Pig schema definition to the LOAD statement, using the MongoDB document field listing that we saw when we displayed a single record from the MongoDB collection earlier.

I can start by referencing the document fields that become simple Pig dataypes; ID and interactionId, for example:

grunt> tweets = LOAD 'mongodb://cdh51-node1:27017/datasiftmongodb.my_first_test' using com.mongodb.hadoop.pig.MongoLoader('id:chararray,interactionId:chararray','id');
2014-09-05 06:57:57,985 [main] INFO  com.mongodb.hadoop.pig.MongoStorage - Initializing MongoLoader in dynamic schema mode.
2014-09-05 06:57:58,022 [main] INFO  com.mongodb.hadoop.pig.MongoStorage - Initializing MongoLoader in dynamic schema mode.
grunt> describe tweets
2014-09-05 06:58:11,611 [main] INFO  com.mongodb.hadoop.pig.MongoStorage - Initializing MongoLoader in dynamic schema mode.
tweets: {id: chararray,interactionId: chararray}
grunt> tweets_limit_1 = LIMIT tweets 1;
...
(53fae22e9ad4ec93658b513e,1e42c2747542a100e074fff55100414a)
grunt>

Where the MongoDB document has fields nested within other fields, you can reference these as a tuple if they’re a set of attributes under a common header, or a bag if they’re just a list of values for a single attribute; for example, the “username” field is contained within the author tuple, which in-turn is contained within the interaction tuple, so to count tweets by author I’d need to first flatten the author tuple to turn its fields into scalar fields, then project out the username and other details; then I can group the relation in the normal way on those author details, and generate a count of tweets, like this:

grunt> tweets = LOAD 'mongodb://cdh51-node1:27017/datasiftmongodb.rm_tweets' using com.mongodb.hadoop.pig.MongoLoader('id:chararray,interactionId:chararray,interaction:tuple(interaction:tuple(author:tuple(id:int,language:chararray,link:chararray,name:chararray,username:chararray)))','id');
grunt> tweets_author_tuple_flattened = FOREACH tweets GENERATE id, FLATTEN(interaction.$0);                                            
grunt> tweets_with_authors = FOREACH tweets_author_tuple_flattened GENERATE id, interaction::author.username, interaction::author.name;
grunt> tweets_author_group = GROUP tweets_with_authors by username; 
grunt> tweets_author_count = FOREACH tweets_author_group GENERATE group, COUNT(tweets_with_authors); 
...
(rmoff,1)
(dw_pete,1)
(mRainey,3)
(P_J_FLYNN,3)
(davidhuey,7)
(EdelweissK,1)
(JamesOickle,3)
(markrittman,3)
(rittmanmead,2)
(RedgraveChris,1)
grunt>

So there’s obviously a lot more we can do with the Twitter dataset as it stands, but where it’ll get really interesting is combining this with other social media interaction data - for example from Facebook, LinkedIn and so on - and then correlating that with out main site activity data. Check back in a few days when we’ll be covering this second stage in a further blog article, using ODI12c to orchestrate the process.