KSQL: Streaming SQL for Apache Kafka

KSQL: Streaming SQL for Apache Kafka

Few weeks back, while I was enjoying my holidays in the south of Italy, I started receiving notifications about an imminent announcement by Confluent. Reading the highlights almost (...I said almost) made me willing to go immediately back to work and check all the details about it.
The announcement regarded KSQL: a streaming SQL engine for Apache Kafka!

Before going in detail, lets try to clarify the basics: what is KSQL? Why was it introduced and how does it complement Kafka?

What is KSQL?

We have been writing about Kafka several times, including my recent blogs were I was using it as data hub to capture Game of Thrones tweets and store them in BigQuery in order to do sentiment analysis with Tableau. In all our examples Kafka has been used just for data transportation with any necessary transformation happening in the target datastore like BigQuery, with the usage of languages like Python and engines like Spark Streaming or directly in the querying tool like Presto.

KSQL enables something really effective: reading, writing and transforming data in real-time and a scale using a semantic already known by the majority of the community working in the data space, the SQL!

KSQL image

KSQL is now available as developer preview, but the basic operations like joins, aggregations and event-time windowing are already covered.

What Problem is KSQL Solving?

As anticipated before, KSQL solve the main problem of providing a SQL interface over Kafka, without the need of using external languages like Python or Java.
However one could argue that the same problem was solved before by the ETL operations made on the target datastores like Oracle Database or BigQuery. What is the difference then in KSQL approach? What are the benefits?

The main difference in my opinion is the concept of continuous queries: with KSQL transformations are done continuously as new data arrives in the Kafka topic. On the other side transformations done in a database (or big data platforms like BigQuery) are one off and if new data arrives the same transformation has to be executed again.

Tweet Flow

So what is KSQL good for? Confluent's KSQL introduction blog post provides some use cases like real time analytics, security and anomaly detection, online data integration or general application development. From a generic point of view KSQL is what you should use when transformations, integrations and analytics need to happen on the fly during the data stream. KSQL provides a way of keeping Kafka as unique datahub: no need of taking out data, transforming and re-inserting in Kafka. Every transformation can be done Kafka using SQL!

As mentioned before KSQL is now available on developer preview and the feature/function list is somehow limited compared to more mature SQL products. However in cases where very complex transformations need to happen those can still be solved either via another language like Java or a dedicated ETL (or view) once the data is landed in the destination datastore.

How does KSQL work?

So how does KSQL work under the hood? There are two concepts to keep in mind: streams and tables. A Stream is a sequence of structured data, once an event was introduced into a stream it is immutable, meaning that it can't be updated or deleted. Imagine the number of items pushed or pulled from a storage: "e.g. 200 pieces of ProductA were stocked today, while 100 pieces of ProductB were taken out".
A Table on the other hand represents the current situation based on the events coming from a stream. E.g. what's the overall quantity of stocks for ProductA? Facts in a table are mutable, the quantity of ProductA can be updated or deleted if ProductA is not anymore in stock.

Stream vs Table

KSQL enables the definition of streams and tables via a simple SQL dialect. Various streams and tables coming from different sources can be joined directly in KSQL enabling data combination and transformation on the fly.

Each stream or table created in KSQL will be stored in a separate topic, allowing the usage of the usual connectors or scripts to extract the informations from it.

KSQL in Action

Starting KSQL

KSQL can work both in standalone and client-server mode with the first one aimed at development and testing scenarios while the second supporting production environments.
With the standalone mode KSQL client and server are hosted on the same machine, in the same JVM. On the other side, in client-server mode, a pool of KSQL server are running on remote machine and the client connects to them over HTTP.

For my test purposes I decided to use the standalone mode, the procedure is well explained in confluent documentation and consist in three steps:

  • Clone the KSQL repository
  • Compile the code
  • Start KSQL using local parameter
./bin/ksql-cli local

Analysing OOW Tweets

I'll use for my example the same Twitter producer created for my Wimbledon post. If you notice I'm not using the Kafka Connect, this is due to KSQL not supporting AVRO formats as of now (remember is still in dev phase?). I had then to rely on the old producer which stored the tweet in JSON format.

For my tests I've been filtering the tweets containing OOW17 and OOW (Oracle Open World 2017), and as mentioned before, those are coming in JSON format and stored in a Kafka topic named rm.oow. The first step is then to create a Stream on top of the topic in order to structure the data before doing any transformation.
The guidelines for the stream definition can be found here, the following is a cutdown version of the code used

CREATE STREAM twitter_raw ( \
  Created_At VARCHAR, \
  Id BIGINT, \
  Text VARCHAR, \
  Source VARCHAR, \
  Truncated VARCHAR, \
  ... 
  User VARCHAR, \
  Retweet VARCHAR, \
  Contributors VARCHAR, \
  ...) \
WITH ( \
  kafka_topic='rm.oow', \
  value_format='JSON' \
  );

Few things to notice:

  • Created_At VARCHAR: Created_At is a timestamp, however in the first stream definition I can't apply any date/timestamp conversion. I keep it as VARCHAR which is one of the allowed types (others are BOOLEAN, INTEGER, BIGINT, DOUBLE, VARCHAR, ARRAY<ArrayType> and MAP<VARCHAR, ValueType>).
  • User VARCHAR: the User field is a JSON nested structure, for the basic stream definition we'll leave it as VARCHAR with further transformations happening later on.
  • kafka_topic='rm.oow': source declaration
  • value_format='JSON': data format

Once created the first stream we can then query it in SQL like

select Created_at, text from twitter_raw

with the output being in the form of a continuous flow: as soon as a new tweet arrives its visualized in the console.

simple SQL statement

The first part I want to fix now is the Created_At field, which was declared as VARCHAR but needs to be mutated into timestamp. I can do it using the function STRINGTOTIMESTAMP with the mask being EEE MMM dd HH:mm:ss ZZZZZ yyyy. This function converts the string to a BIGINT which is the datatype used by Kafka to store timestamps.

Another section of the tweet that needs further parsing is the User, that as per the previous definition returns the whole nested JSON object.

{
"id":575384370,
"id_str":"575384370",
"name":"Francesco Tisiot",
"screen_name":"FTisiot",
"location":"Verona, Italy","url":"http://it.linkedin.com/in/francescotisiot",
"description":"ABC"
...
}

Fortunately KSQL provides the EXTRACTJSONFIELD function that we can then use to parse the JSON and retrieve the required fields

I can now define a new twitter_fixed stream with the following code

create stream twitter_fixed as 
  select STRINGTOTIMESTAMP(Created_At, 'EEE MMM dd HH:mm:ss ZZZZZ yyyy') AS  Created_At, \
    Id, \
    Text, \
    Source, \
    ..., \
    EXTRACTJSONFIELD(User, '$.name') as User_name, \
    EXTRACTJSONFIELD(User, '$.screen_name') as User_screen_name, \
    EXTRACTJSONFIELD(User, '$.id') as User_id, \
    EXTRACTJSONFIELD(User, '$.location') as User_location, \
    EXTRACTJSONFIELD(User, '$.description') as description \
  from twitter_raw

An important thing to notice is that the Created_At is not encoded as BigInt, thus if I execute select Created_At from twitter_fixed I get only the raw number. To translate it to a readable date I can use the STRINGTOTIMESTAMP function passing the column and the data format.

The last part of the stream definition I wanted to fix is the settings of KEY and TIMESTAMP: a KEY is the unique identifier of a message and, if not declared, is auto-generated by Kafka. However the tweet JSON contains the Id which is Twitter's unique identifier, so we should to use it. TIMESTAMP associates the message timestamp with a column in the stream: Created_At should be used. I can defined the two above in the WITH clause of the stream declaration.

create stream twitter_with_key_and_timestamp \
as \
select * from twitter_fixed \
with \
(KEY='Id', TIMESTAMP='Created_At');

When doing a select * from twitter_with_key_and_timestamp we can clearly see that KSQL adds two columns before the others containing TIMESTAMP and KEY and the two are equal to Created_At and Id.

TimeStamp Key

Now I have all the fields correctly parsed as KSQL stream, nice but in my previous blog post I had almost the same for free using Kafka Connect. Now It's time to discover the next step of KSQL: tables!

Let's first create a simple table containing the number of tweets by User_name.

create table tweets_by_users as \
select user_screen_name, count(Id) nr_of_tweets \
from twitter_with_key_and_timestamp \
group by user_screen_name

When then executing a simple select * from table we can see the expected result.

Users and counts

Two things to notice:

  • We see a new row in the console every time there is a new record inserted in the oow topic, the new row contains the updated count of tweets for the screen_name selected
  • The KEY is automatically generated by KSQL and contains the screen_name

I can retrieve the list of tables define with the show tables command.

List of Tables

It's interesting to notice that the format is automatically set as JSON. The format property, configured via the VALUE_FORMAT parameter, defines how the message is stored in the topic and can either be JSON or DELIMITED.

Windowing

When grouping, KSQL provides three different windowing functions:

  • Tumbling: Fixed size, non overlapping. The SIZE of the window needs to be specified.
  • Hopping: Fixed size, possibly overlapping. The SIZE and ADVANCE parameters need to be specified.
  • Session: Fixed size, starting from the first entry for a particular Key, it remains active until a new message with the same key happens within the INACTIVITY_GAP which is the parameter to be specified.

Windowing

I can create simple table definition like the number of tweets by location for each tumbling session with

create table rm.tweets_by_location \
as \
select user_location, \
count(Id) nr_of_tweets \
from twitter_with_key_and_timestamp \
WINDOW TUMBLING (SIZE 30 SECONDS) \
group by user_location

the output looks like

Tumbling Window

As you can see the KEY of the table contains both the user_location and the window Timestamp (e.g Colombes : Window{start=1507016760000 end=-})

An example of hopping can be created with a similar query

create table rm.tweets_by_location_hopping \
as \
select user_location, \
count(Id) nr_of_tweets \
from twitter_with_key_and_timestamp \
WINDOW HOPPING (SIZE 30 SECONDS, ADVANCE BY 10 SECONDS) \
group by user_location;

With the output being like

Hopping Window

It's interesting to notice that each entry (e.g. Europe North, Switzerland) is listed at least three times. This is due to the fact that in any point in time there are three overlapping windows (SIZE is 30 seconds and ADVANCE is 10 seconds). The same example can be turn into the session windows by just defining WINDOW SESSION (30 SECONDS).

The windowing is an useful option, especially when combined with HAVING clauses since it gives the option to define metrics for real time analysis.
E.g. I may be interested only items that have been ordered more than 100 times in the last hour, or, in my twitter example in user_locations having a nr_of_tweets greater than 5 in the last 30 minutes.

Joining

So far so good, a nice set of SQL functions on top of data coming from a source (in my case twitter). In the real word however we'll need to mix information coming from disparate sources.... what if I tell you that you can achieve that in a single KSQL statement?

Face KSQL

To show an integration example I created a simple topic known_twitters using the kafka-console-producer.

./bin/kafka-console-producer --topic known_twitters --broker-list myserver:9092

Once started I can type in messages and those will be stored in the known_twitters topic. For this example I'll insert the twitter handle and real name of known people that are talking about OOW. The format will be:

username,real_name

like

FTisiot,Francesco Tisiot
Nephentur,Christian Berg

Once inserted the rows with the producer I'm then able to create a KSQL stream on top of it with the following syntax (note the VALUE_FORMAT='DELIMITED')

create stream people_known_stream (\
screen_name VARCHAR, \
real_name VARCHAR) \
WITH (\
KAFKA_TOPIC='known_twitters', \
VALUE_FORMAT='DELIMITED');

I can now join this stream with the others streams or tables built previously. However when trying the following statement

select user_screen_name from rm.tweets_by_users a join PEOPLE_KNOWN_STREAM b on a.user_screen_name=b.screen_name;

I get a nice error

Unsupported join logical node: Left: io.confluent.ksql.planner.plan.StructuredDataSourceNode@6ceba9de , Right: io.confluent.ksql.planner.plan.StructuredDataSourceNode@69518572

This is due to the fact that as of now KSQL supports only joins between a stream and a table, and the stream needs to be specified first in the KSQL query. If I then just swap the two sources in the select statement above:

select user_screen_name from PEOPLE_KNOWN_STREAM a join rm.tweets_by_users b on a.screen_name=b.user_screen_name;

...I get another error

Join type is not supportd yet: INNER

We have to remember that KSQL is still in developer beta phase, a lot of new features will be included before the official release.

adding a LEFT JOIN clause (see bug related) solves the issue and I should be able to see the combined data. However when running

select * from PEOPLE_KNOWN_STREAM left join TWEETS_BY_USERS on screen_name=user_screen_name;

Didn't retrieve any rows. After adding a proper KEY to the stream definition

create stream PEOPLE_KNOWN_STREAM_PARTITIONED \
as select screen_name , \
real_name from  people_known_stream \
PARTITION BY screen_name;

I was able to retrieve the correct rowset! Again, we are in early stages of KSQL, those fixes will be enhanced or better documented in future releases!

Conclusion

As we saw in this small example, all transformations, summaries and data enrichments were done directly in Kafka with a dialect very easy to learn for anyone already familiar with SQL. All the created streams/tables are stored as Kafka topics thus the standard connectors can be used for sink integration.

As mentioned above KSQL is still in developer preview but the overall idea is very simple and at the same time powerful. If you want to learn more check out the Confluent page and the KSQL github repository!