Analyzing Wimbledon Twitter Feeds in Real Time with Kafka, Presto and Oracle DVD v3

Analyzing Wimbledon Twitter Feeds in Real Time with Kafka, Presto and Oracle DVD v3

Last week there was Wimbledon, if you are a fan of Federer, Nadal or Djokovic then it was one of the events not to be missed. I deliberately excluded Andy Murray from the list above since he kicked out my favourite player: Dustin Brown.

Dustin Brown

Two weeks ago I was at Kscope17 and one of the common themes, which reflected where the industry is going, was the usage of Kafka as central hub for all data pipelines. I wont go in detail on what's the specific role of Kafka and how it accomplishes, You can grab the idea from two slides taken from a recent presentation by Confluent.

Kafka

One of the key points of all Kafka-related discussions at Kscope was that Kafka is widely used to take data from providers and push it to specific data-stores (like HDFS) that are then queried by analytical tools. However the "parking to data-store" step can sometimes be omitted with analytical tools querying directly Kafka for real-time analytics.

Park

We wrote at the beginning of the year a blog post about doing it with Spark Streaming and Python however that setup was more data-scientist oriented and didn't provide a simple ANSI SQL familiar to the beloved end-users.

As usual, Oracle annouced a new release during Kscope. This year it was Oracle Data Visualization Desktop 12.2.3.0.0 with a bunch of new features covered in my previous blog post.
The enhancement, amongst others, that made my day was the support for JDBC and ODBC drivers. It opened a whole bundle of opportunities to query tools not officially supported by DVD but that expose those type of connectors.

One of the tools that fits in this category is Presto, a distributed query engine belonging to the same family of Impala and Drill commonly referred as sql-on-Hadoop. A big plus of this tool, compared to the other two mentioned above, is that it queries natively Kafka via a dedicated connector.

I found then a way of fitting the two of the main Kscope17 topics, a new sql-on-Hadoop tool and one of my favourite sports (Tennis) in the same blog post: analysing real time Twitter Feeds with Kafka, Presto and Oracle DVD v3. Not bad as idea.... let's check if it works...

Analysing Twitter Feeds

Let's start from the actual fun: analysing the tweets! We can navigate to the Oracle Analytics Store and download some interesting add-ins we'll use: the Auto Refresh plugin that enables the refresh of the DV project, the Heat Map and Circle Pack visualizations and the Term Frequency advanced analytics pack.

Importing the plugin and new visualizations can be done directly in the console as explained in my previous post. In order to be able to use the advanced analytics function we need to unzip the related file and move the .xml file contained in the %INSTALL_DIR%\OracleBI1\bifoundation\advanced_analytics\script_repository. In the Advanced Analytics zip file there is also a .dva project that we can import into DVD (password Admin123) which gives us a hint on how to use the function.

We can now build a DVD Project about the Wimbledon gentleman singular final containing:

  • A table view showing the latest tweets
  • A horizontal bar chart showing the number of tweets containing mentions to Federer, Cilic or Both
  • A circle view showing the most tweeted terms
  • A heatmap showing tweet locations (only for tweets with an activated localization)
  • A line chart showing the number of tweets over time

The project is automatically refreshed using the auto-refresh plugin mentioned above. A quick view of the result is provided by the following image.

Presto Output

So far all good and simple! Now it's time to go back and check how the data is collected and queried. Let's start from Step #1: pushing Twitter data to Kafka!

Kafka

We covered Kafka installation and setup in previous blog post, so I'll not repeat this part.
The only piece I want to mention, since gave me troubles, is the advertised.host.name setting: it's a configuration line in /opt/kafka*/config/server.properties that tells Kafka which is the host where it's listening.

If you leave the default localhost and try to push content to a topic from an external machine it will not show up, so as pre-requisite change it to a hostname/IP that can be resolved externally.

The rest of the Kafka setup is the creation of a Twitter producer, I took this Java project as example and changed it to use the latest Kafka release available in Maven. It allowed me to create a Kafka topic named rm.wimbledon storing tweets containing the word Wimbledon.

The same output could be achieved using Kafka Connect and its sink and source for twitter. Kafka Connect has also the benefit of being able to transform the data before landing it in Kafka making the data parsing easier and the storage faster to retrieve. I'll cover the usage of Kafka Connect in a future post, for more informations about it, check this presentation from Robin Moffatt of Confluent.

One final note about Kafka: I run a command to limit the retention to few minutes

bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic rm.wimbledon --config retention.ms=300000

This limits the amount of data that is kept in Kafka, providing better performances during query time. This is not always possible in Kafka due to data collection needs and there are other ways of optimizing the query if necessary.

At this point of our project we have a dataflow from Twitter to Kafka, but no known way of querying it with DVD. It's time to introduce the query engine: Presto!

Presto

Presto was developed at Facebook, is in the family of sql-on-Hadoop tools. However, as per Apache Drill, it could be called sql-on-everything since data don't need to reside on an Hadoop system. Presto can query local file systems, MongoDB, Hive, and a big variety of datasources.

As the other sql-on-Hadoop technologies it works with always-on daemons which avoid the latency proper of Hive in starting a MapReduce job. Presto, differently from the others, divides the daemons in two types: the Coordinator and the Worker. A Coordinator is a node that receives the query from the clients, it analyses and plans the execution which is then passed on to Workers to carry out.

In other tools like Impala and Drill every node by default could add as both worker and receiver. The same can also happen in Presto but is not the default and the documentation suggest to dedicate a single machine to only perform coordination tasks for best performance in large cluster (reference to the doc).

The following image, taken from Presto website, explains the flow in case of usage of the Hive metastore as datasource.

Presto overview

Installation

The default Presto installation procedure is pretty simple and can be found in the official documentation. We just need to download the presto-server-0.180.tar.gz tarball and unpack it.

tar -xvf presto-server-0.180.tar.gz

This creates a folder named presto-server-0.180 which is the installation directory, the next step is to create a subfolder named etc which contains the configuration settings.

Then we need to create five configuration files and a folder within the etc folder:

  • node.environment: configuration specific to each node, enables the configuration of a cluster
  • jvm.config: options for the Java Virtual Machine
  • config.properties: specific coordinator/worker settings
  • log.properties: specifies log levels
  • catalog: a folder that will contain the data source definition

For a the basic functionality we need the following are the configurations:

node.environment

node.environment=production
node.id=ffffffff-ffff-ffff-ffff-ffffffffffff
node.data-dir=/var/presto/data

With the environment parameter being shared across all the nodes in the cluster, the id being a unique identifier of the node and the data-dir the location where Presto will store logs and data.

jvm.config

-server
-Xmx4G
-XX:+UseG1GC
-XX:G1HeapRegionSize=32M
-XX:+UseGCOverheadLimit
-XX:+ExplicitGCInvokesConcurrent
-XX:+HeapDumpOnOutOfMemoryError
-XX:+ExitOnOutOfMemoryError

I reduced the -Xmx parameter to 4GB as I'm running in a test VM. The parameters can of course be changed as needed.

config.properties

Since we want to keep it simple we'll create a unique node acting both as coordinator and as worker, the related config file is:

coordinator=true
node-scheduler.include-coordinator=true
http-server.http.port=8080
query.max-memory=5GB
query.max-memory-per-node=1GB
discovery-server.enabled=true
discovery.uri=http://linuxsrv.local.com:8080

Where the coordinator=true tells Presto to function as coordinator, http-server.http.port defines the ports, and discovery.uri is the URI to the Discovery server (in this case the same process).

log.properties

com.facebook.presto=INFO

We can keep the default INFO level, other levels are DEBUG, WARN and ERROR.

catalog

The last step in the configuration is the datasource setting: we need to create a folder named catalog within etc and create a file for each connection we intend to use.

For the purpose of this post we want to connect to the Kafka topic named rm.wimbledon. We need to create a file named kafka.properties within the catalog folder created above. The file contains the following lines

connector.name=kafka
kafka.nodes=linuxsrv.local.com:9092
kafka.table-names=rm.wimbledon
kafka.hide-internal-columns=false

where kafka.nodes points to the Kafka brokers and kafka.table-names defines the list of topics delimited by a ,.

The last bit needed is to start the Presto server by executing

bin/launcher start

We can append the --verbose parameter to debug the installation with logs that can be found in the var/log folder.

Presto Command Line Client

In order to query Presto via command line interface we just need to download the associated client (see official doc) which is in the form of a presto-cli-0.180-executable.jar file. We can now rename the file to presto and make it executable.

mv presto-cli-0.180-executable.jar presto
chmod +x presto

Then we can start the client by executing

./presto --server linuxsrv.local.com:8080 --catalog kafka --schema rm

Remember that the client has a JDK 1.8 as prerequisite, otherwise you will face an error. Once the client is successfully setup, we can start querying Kafka

You could notice that the schema (rm) we're connecting is just the prefix of the rm.wimbledon topic used in kafka. In this way I could potentially store other topics using the same rm prefix and being able to query them all together.

We can check which schemas can be used in Kafka with

presto:rm> show schemas;
       Schema       
--------------------
 information_schema 
 rm                 
(2 rows)

We can also check which topics are contained in rm schema by executing

presto:rm> show tables;
   Table   
-----------
 wimbledon 
(1 row)

or change schema by executing

use information_schema;

Going back to the Wimbledon example we can describe the content of the topic by executing

presto:rm> describe wimbledon;
      Column       |  Type   | Extra |                   Comment                   
-------------------+---------+-------+---------------------------------------------
 _partition_id     | bigint  |       | Partition Id                                
 _partition_offset | bigint  |       | Offset for the message within the partition 
 _segment_start    | bigint  |       | Segment start offset                        
 _segment_end      | bigint  |       | Segment end offset                          
 _segment_count    | bigint  |       | Running message count per segment           
 _key              | varchar |       | Key text                                    
 _key_corrupt      | boolean |       | Key data is corrupt                         
 _key_length       | bigint  |       | Total number of key bytes                   
 _message          | varchar |       | Message text                                
 _message_corrupt  | boolean |       | Message data is corrupt                     
 _message_length   | bigint  |       | Total number of message bytes               
(11 rows)

We can immediately start querying it like

presto:rm> select count(*) from wimbledon;
 _col0 
-------
 42295 
(1 row)

Query 20170713_102300_00023_5achx, FINISHED, 1 node
Splits: 18 total, 18 done (100.00%)
0:00 [27 rows, 195KB] [157 rows/s, 1.11MB/s]

Remember all the queries are going against Kafka in real time, so the more messages we push, the more results we'll have available. Let's now check what the messages looks like

presto:rm> SELECT _message FROM wimbledon LIMIT 5;
                                                                                                                                                                 
-----------------------------------------------------------------------------------------------------------------------------------------------------------------
 {"created_at":"Thu Jul 13 10:22:46 +0000 2017","id":885444381767081984,"id_str":"885444381767081984","text":"RT @paganrunes: Ian McKellen e Maggie Smith a Wimbl
                                                                                                                                                                 
 {"created_at":"Thu Jul 13 10:22:46 +0000 2017","id":885444381913882626,"id_str":"885444381913882626","text":"@tomasberdych spricht vor dem @Wimbledon-Halbfinal 
                                                                                                                                                                 
 {"created_at":"Thu Jul 13 10:22:47 +0000 2017","id":885444388645740548,"id_str":"885444388645740548","text":"RT @_JamieMac_: Sir Andrew Murray is NOT amused wit
                                                                                                                                                                 
 {"created_at":"Thu Jul 13 10:22:49 +0000 2017","id":885444394404503553,"id_str":"885444394404503553","text":"RT @IBM_UK_news: What does it take to be a #Wimbled
                                                                                                                                                                 
 {"created_at":"Thu Jul 13 10:22:50 +0000 2017","id":885444398929989632,"id_str":"885444398929989632","text":"RT @PakkaTollywood: Roger Federer Into Semifinals \
                                                                                                                                                                 
(5 rows)

As expected tweets are stored in JSON format, We can now use the [Presto JSON functions](JSON Functions and Operators) to extract the relevant informations from it. In the following we're extracting the user.name part of every tweet. Node the LIMIT 10 (common among all the SQL-on-Hadoop technologies) to limit the number of rows returned.

presto:rm> SELECT json_extract_scalar(_message, '$.user.name') FROM wimbledon LIMIT 10;
        _col0        
---------------------
 pietre --           
 BLICK Sport         
 Neens               
 Hugh Leonard        
 ••••Teju KaLion•••• 
 Charlie Murray      
 Alex                
 The Daft Duck.      
 Hotstar             
 Raj Singh Chandel   
(10 rows)

We can also create summaries like the top 10 users by number of tweets.

presto:rm> SELECT json_extract_scalar(_message, '$.user.name') as screen_name, count(json_extract_scalar(_message, '$.id')) as nr FROM wimbledon GROUP BY json_extract_scalar(_message, '$.user.name') ORDER BY count(json_extract_scalar(_message, '$.id')) desc LIMIT 10;
     screen_name     | nr  
---------------------+-----
 Evarie Balan        | 125 
 The Master Mind     | 104 
 Oracle Betting      |  98 
 Nichole             |  85 
 The K - Man         |  75 
 Kaciekulasekran     |  73 
 vientrainera        |  72 
 Deporte Esp         |  66 
 Lucas Mc Corquodale |  64 
 Amal                |  60 
(10 rows)

Adding a Description file

We saw above that it's possible to query with ANSI SQL statements using the Presto JSON function. The next step will be to define a structure on top of the data stored in the Kafka topic to turn raw data in a table format. We can achieve this by writing a topic description file. The file must be in json format and stored under the etc/kafka folder; it is recommended, but not necessary, that the name of the file matches the kafka topic (in our case rm.wimbledon). The file in our case would be the following

{
    "tableName": "wimbledon",
    "schemaName": "rm",
    "topicName": "rm.wimbledon",
    "key": {
        "dataFormat": "raw",
        "fields": [
            {
                "name": "kafka_key",
                "dataFormat": "LONG",
                "type": "BIGINT",
                "hidden": "false"
            }
        ]
    },
    "message": {
        "dataFormat": "json",
        "fields": [
            {
                "name": "created_at",
                "mapping": "created_at",
                "type": "TIMESTAMP",
                "dataFormat": "rfc2822"
            },
            {
                "name": "tweet_id",
                "mapping": "id",
                "type": "BIGINT"
            },
            {
                "name": "tweet_text",
                "mapping": "text",
                "type": "VARCHAR"
            },
            {
                "name": "user_id",
                "mapping": "user/id",
                "type": "VARCHAR"
            },
            {
                "name": "user_name",
                "mapping": "user/name",
                "type": "VARCHAR"
            },
            [...]
        ]
    }
}

After restarting Presto when we execute the DESCRIBE operation we can see all the fields available.

presto:rm> describe wimbledon;
      Column       |   Type    | Extra |                   Comment                   
-------------------+-----------+-------+---------------------------------------------
 kafka_key         | bigint    |       |                                             
 created_at        | timestamp |       |                                             
 tweet_id          | bigint    |       |                                             
 tweet_text        | varchar   |       |                                             
 user_id           | varchar   |       |                                             
 user_name         | varchar   |       |                                             
 user_screenname   | varchar   |       |                                             
 user_location     | varchar   |       |                                             
 user_followers    | bigint    |       |                                             
 user_time_zone    | varchar   |       |                                             
 _partition_id     | bigint    |       | Partition Id                                
 _partition_offset | bigint    |       | Offset for the message within the partition 
 _segment_start    | bigint    |       | Segment start offset                        
 _segment_end      | bigint    |       | Segment end offset                          
 _segment_count    | bigint    |       | Running message count per segment           
 _key              | varchar   |       | Key text                                    
 _key_corrupt      | boolean   |       | Key data is corrupt                         
 _key_length       | bigint    |       | Total number of key bytes                   
 _message          | varchar   |       | Message text                                
 _message_corrupt  | boolean   |       | Message data is corrupt                     
 _message_length   | bigint    |       | Total number of message bytes               
(21 rows)

Now I can use the newly defined columns in my query

presto:rm> select created_at, user_name, tweet_text from wimbledon LIMIT 10;

and the related results

Table Results

We can always mix defined columns with custom JSON parsing Presto syntax if we need to extract some other fields.

select created_at, user_name, json_extract_scalar(_message, '$.user.default_profile') from wimbledon LIMIT 10;

Oracle Data Visualization Desktop

As mentioned at the beginning of the article, the overall goal was to analyse Wimbledon twitter feed in real time with Oracle Data Visualization Desktop via JDBC, so let's complete the picture!

JDBC drivers

First step is to download the Presto JDBC drivers version 0.175, I found them in the Maven website. I tried also the 0.180 version downloadable directly from Presto website but I had several errors in the connection.
After downloading we need to copy the driver presto-jdbc-0.175.jar under the %INSTALL_DIR%\lib folder where %INSTALL_DIR% is the Oracle DVD installation folder and start DVD. Then I just need to create a new connection like the following

Presto Connection

Note that:

  • URL: includes also the /kafka postfix, this tells Presto which storage I want to query
  • Driver Class Name: this setting puzzled me a little bit, I was able to discover the string (with the help of Gianni Ceresa) by concatenating the folder name and the driver class name after unpacking the jar file

Jar file

** Username/password: those strings can be anything since for the basic test we didn't setup any security on Presto.

The whole JDBC process setting is described in this youtube video provided by Oracle.

We can then define the source by just selecting the columns we want to import and create few additional ones like the Lat and Long parsing from the coordinates column which is in the form [Lat, Long]. The dataset is now ready to be analysed as we saw at the beginning of the article, with the final result being:

Presto Output

Conclusions

As we can see from the above picture the whole process works (phew....), however it has some limitations: there is no pushdown of functions to the source so most of the queries we see against Presto are in the form of

select tweet_text, tweet_id, user_name, created_at from (
select coordinates, 
 coordinates_lat_long,
 created_at,
 tweet_id,
 tweet_text,
 user_followers,
 user_id,
 user_location,
 user_name,
 user_screenname,
 user_time_zone
from rm.wimbledon)

This means that the whole dataset is retrieved every time making this solution far from optimal for big volumes of data. In those cases probably the "parking" to datastore step would be necessary. Another limitation is related to the transformations, the Lat and Long extractions from coordinates field along with other columns transformations are done directly in DVD, meaning that the formula is applied directly in the visualization phase. In the second post we'll see how the source parsing phase and query performances can be enhanced using Kafka Connect, the framework allowing an easy integration between Kafka and other sources or sinks.

One last word: winning Wimbledon eight times, fourteen years after the first victory and five years after the last one it's something impressive! Chapeau mr Federer!