Trickle-Feeding Log Data into the HBase NoSQL Database using Flume

May 21, 2014 Big Data 2 Comments

The other day I posted an article on the blog around using Flume to transport Apache web log entries from our website into Hadoop, with the final destination for the entries being an HDFS file – with the HDFS file essentially mirroring the contents of the webserver log file. Once you’ve set this transport mechanism up, you could create a Hive table over the HDFS files, for example, or further transform the data using Pig, Spark or some other mechanism.

When you load data into HDFS files though, there are a couple of things you need to be aware of; HDFS is optimised for large, streaming reads of files stored in very large disk blocks, with the classic use-case being MapReduce transformations that crunch large sets of incoming data and hand-off the results to another process. What it’s not good at is random retrievals of single file records, something you’ll notice if you try and return a single row from a Hive table request. Moreover, HDFS files are write-once, no updates or overwrites, which is why Hive only supports SELECTS and not UPDATES or DELETES. Altogether, whilst HDFS is great for landing and then processing large chunks of data, if you’re looking for more granular, database-type storage on Hadoop, you’ll need to think of something else.

And within the context of Cloudera Hadoop, that other thing is HBase, a “NoSQL” database that’s also open-source and runs on the Hadoop framework. Whilst you can work with HBase in similar ways to how you work with relational databases – you can create columns, load data into it, insert and update data and so forth – HBase and NoSQL are in lots of ways the complete opposite of relational databases like Oracle Database, as they trade-off things we normally take for granted but that have performance and scalability impacts – ACID transactions, the ability to support complex table relationships, very rich query languages and application support – for extreme scalability and flexibility. If you’re scared of losing your data then HBase is one of the better NoSQL databases, with strong (rather than “eventual”) consistency, automatic shading and lots of high-availability features, but it’s not designed for running your payroll (yet).

One reason we might want to land data in HBase or another NoSQL database, rather than in regular HDFS files, is if we then want to do fast individual record lookups within the landed data. Another reason would be HBase’s support for complex record types, making it easy to store for example nested XML datasets, and its ability – like the Endeca Server – to hold completely different sets of “columns” for each row in the database, and even version those rows giving us almost a “multi-dimensional” database. Internally, HBase stores data as key-value pairs giving it the ability to hold completely different data in each database row, and under the covers HBase data is in turn stored in indexed “StoreFiles” within HDFS, giving it HDFS’s scalability and access to the Hadoop framework, but adding fast random access to individual records.


Where HBase (and most NoSQL databases) get complicated though is that there’s no SQL*Developer or TOAD to create tables, and no SQL or PL/SQL to load and manipulate them – it’s all done through Java and custom code – this article by Lars George who gave the Hadoop Masterclass as last week’s BI Forum goes into a bit more detail, along with his HBase slides and his book, “HBase: The Definitive Guide”.

So let’s look at a simple example of loading Apache CombinedLogFormat log file entries into HBase, using Flume to transport and ingest the data from our webserver into Hadoop and put together again by Nelio Guimaraes from the RM team. We’ll start by defining the HBase table, which like regular relational tables has rows but which has the concept of column families and column qualifiers rather than just columns. In practice, a column family + qualifier name makes what we’d normally think of as a column, but crucially under the covers column within families are stored together on disk, like column-store relational databases, making them fast to query and randomly access. Like a spreadsheet or OLAP database each combination of row and column family/qualifier is called a “cell”, and moreover only populated cells are stored on disk, with the added bonus of cell entries being timestamped, giving us the ability to retrieve previous versions of cell entries, like the temporal query feature in Oracle Database 12c.


For more details on how HBase stores data, and how HBase schemas are defined, the white paper “Introduction to HBase Schema Design” by Cloudera’s Amandeep Khurana is a good reference point and introduction. So let’s go into the HBase shell and create a table to contain our log data; we’ll define as containing three column families (“common”,”http” and “misc”), with the actual column qualifiers defined at the point we load data into the table – one of the key features of HBase, and most NoSQL databases, is that you can introduce new columns into a store at the point of loading, just by declaring them, with each row potentially containing its own unique selection of columns – which is where Endeca Server gets its ability to store “jagged datasets” with potentially different attribute sets held for groups of rows.

In this example, the way we’re going to populate the HBase table is to use Flume; like the Flume and HDFS example the other day, we’ll use a “sink”, in this case a HBase sink, to take the incoming Flume activity off the channel and load it into the HBase table. Flume actually has two HBase sinks; one called HBaseSink which writes synchronously (more straightforward but slower) and another called AysncHBaseSink which writes asynchronously, potentially with higher overall throughput than synchronous writes and with full consistency even if there’s a failure (based on replaying the channel data), but with a slightly more complex serialisation approach. We’ll use the asynchronous sink in this example, and assuming you’ve already got the source configuration file set-up (see the previous blog post on Flume and HDFS for an example), the target Flume conf file in our case looked like this:

A few points to note:

  • The collector.sinks.HbaseOut.type setting determines the sink type we’ll use, in this case org.apache.flume.sink.hbase.AsyncHBaseSink
  • collector.sinks.HbaseOut.table sets the HBase table name we’ll load, “apache_access_log”
  • collector.sinks.HbaseOut.serializer.columns actually defines the column qualifiers, in this case mapping incoming serialised log file rows into a set of HBase column families and qualifiers
  • collector.sinks.HbaseOut.serializer is the most important bit – and tells HBase how to turn the incoming Flume data into HBase loads, through a Java program called the “serializer”.

And its this serializer, the Java program that does the actual loading of the HBase table, that’s the final piece of the jigsaw. There are standard templates to use when writing this piece of code, and in our case the serializer looked like this:

HBase, rather than supporting the regular SELECT and INSERTS we’re used to with Oracle, instead uses “get” and “put” methods to retrieve, and store, data – along with “delete” and “scan”. The regular synchronous HBase sync uses these methods directly, taking data off the Flume channel and inserting it into the HBase table (or indeed, updating existing rows based on the row key), whilst the asychnronous method uses a layer in-between the incoming data and the write, allowing data (or “events”) to continue streaming in even if all the downstream data hasn’t get been committed. It’s this code though that maps each incoming bit of data – in this case, a parsed log file – to column families and qualifiers in the HBase table, and you’d need to write new code like this, or amend the exiting one, if you wanted to load other HBase tables in your Hadoop cluster – a long way from the point-and-click ETL approach we get with ODI, but a lot more flexible too (if that’s what you want).

Then it’s a case of compiling the Java code, like this:

Next, we had to run the following command before enabling Flume with this setup, because of an issue we found with Zookeeper stopping Flume working in this setup:

and finally, we start up the Flume target server agent, followed by the source one (again see the previous article for setting up the source Flume agent):

Then, after a while, log data starts getting loaded into the HBase table. You can check on it using Hue, and the HBase Browser:


Or you can go back into the HBase shell and run the scan command to view the data, with each row representing a cell in the overall table storage:

This is all great, and a good starting point if you plan to process your data with other Java programs as the next step. But what if you want to view the data in a more convenient way, perhaps as a regular table? To do that you can use Hive again, this time using Hive’s HBase integration features to tell it the data is stored in HBase format, and to let it know how to display the various HBase column families and qualifiers. In our case, the DDL to create the corresponding Hive table looks like this:

giving us the ability, either from the Hive shell like this, or from tools like OBIEE and ODI, to query the NoSQL database and brings its data into more regular, relational data stores.

We’ll be covering more on HBase, and Oracle’s NoSQL Database, in future articles on the blog.