Trickle-Feeding Log Data into the HBase NoSQL Database using Flume

The other day I posted an article on the blog around using Flume to transport Apache web log entries from our website into Hadoop, with the final destination for the entries being an HDFS file - with the HDFS file essentially mirroring the contents of the webserver log file. Once you’ve set this transport mechanism up, you could create a Hive table over the HDFS files, for example, or further transform the data using Pig, Spark or some other mechanism.

When you load data into HDFS files though, there are a couple of things you need to be aware of; HDFS is optimised for large, streaming reads of files stored in very large disk blocks, with the classic use-case being MapReduce transformations that crunch large sets of incoming data and hand-off the results to another process. What it’s not good at is random retrievals of single file records, something you’ll notice if you try and return a single row from a Hive table request. Moreover, HDFS files are write-once, no updates or overwrites, which is why Hive only supports SELECTS and not UPDATES or DELETES. Altogether, whilst HDFS is great for landing and then processing large chunks of data, if you’re looking for more granular, database-type storage on Hadoop, you’ll need to think of something else.

And within the context of Cloudera Hadoop, that other thing is HBase, a “NoSQL” database that’s also open-source and runs on the Hadoop framework. Whilst you can work with HBase in similar ways to how you work with relational databases - you can create columns, load data into it, insert and update data and so forth - HBase and NoSQL are in lots of ways the complete opposite of relational databases like Oracle Database, as they trade-off things we normally take for granted but that have performance and scalability impacts - ACID transactions, the ability to support complex table relationships, very rich query languages and application support - for extreme scalability and flexibility. If you’re scared of losing your data then HBase is one of the better NoSQL databases, with strong (rather than “eventual”) consistency, automatic shading and lots of high-availability features, but it’s not designed for running your payroll (yet).

One reason we might want to land data in HBase or another NoSQL database, rather than in regular HDFS files, is if we then want to do fast individual record lookups within the landed data. Another reason would be HBase’s support for complex record types, making it easy to store for example nested XML datasets, and its ability - like the Endeca Server - to hold completely different sets of “columns” for each row in the database, and even version those rows giving us almost a “multi-dimensional” database. Internally, HBase stores data as key-value pairs giving it the ability to hold completely different data in each database row, and under the covers HBase data is in turn stored in indexed "StoreFiles” within HDFS, giving it HDFS’s scalability and access to the Hadoop framework, but adding fast random access to individual records.

NewImage

Where HBase (and most NoSQL databases) get complicated though is that there’s no SQL*Developer or TOAD to create tables, and no SQL or PL/SQL to load and manipulate them - it’s all done through Java and custom code - this article by Lars George who gave the Hadoop Masterclass as last week’s BI Forum goes into a bit more detail, along with his HBase slides and his book, “HBase: The Definitive Guide”.

So let’s look at a simple example of loading Apache CombinedLogFormat log file entries into HBase, using Flume to transport and ingest the data from our webserver into Hadoop and put together again by Nelio Guimaraes from the RM team. We’ll start by defining the HBase table, which like regular relational tables has rows but which has the concept of column families and column qualifiers rather than just columns. In practice, a column family + qualifier name makes what we’d normally think of as a column, but crucially under the covers column within families are stored together on disk, like column-store relational databases, making them fast to query and randomly access. Like a spreadsheet or OLAP database each combination of row and column family/qualifier is called a “cell”, and moreover only populated cells are stored on disk, with the added bonus of cell entries being timestamped, giving us the ability to retrieve previous versions of cell entries, like the temporal query feature in Oracle Database 12c.

NewImage

For more details on how HBase stores data, and how HBase schemas are defined, the white paper “Introduction to HBase Schema Design” by Cloudera’s Amandeep Khurana is a good reference point and introduction. So let’s go into the HBase shell and create a table to contain our log data; we’ll define as containing three column families (“common”,”http” and “misc”), with the actual column qualifiers defined at the point we load data into the table - one of the key features of HBase, and most NoSQL databases, is that you can introduce new columns into a store at the point of loading, just by declaring them, with each row potentially containing its own unique selection of columns - which is where Endeca Server gets its ability to store “jagged datasets” with potentially different attribute sets held for groups of rows.

[root@cdh5-node1 ~]# hbase shell
14/05/21 06:00:07 INFO Configuration.deprecation: hadoop.native.lib is deprecated. Instead, use io.native.lib.available
HBase Shell; enter 'help<RETURN>' for list of supported commands.
Type "exit<RETURN>" to leave the HBase Shell

hbase(main):001:0> list
TABLE                                                                           
0 row(s) in 2.8030 seconds

=> []
hbase(main):002:0> create 'apache_access_log', 
hbase(main):003:0* {NAME => 'common'},
hbase(main):004:0* {NAME => 'http'},
hbase(main):005:0* {NAME => 'misc'}
0 row(s) in 0.5460 seconds

In this example, the way we’re going to populate the HBase table is to use Flume; like the Flume and HDFS example the other day, we’ll use a “sink”, in this case a HBase sink, to take the incoming Flume activity off the channel and load it into the HBase table. Flume actually has two HBase sinks; one called HBaseSink which writes synchronously (more straightforward but slower) and another called AysncHBaseSink which writes asynchronously, potentially with higher overall throughput than synchronous writes and with full consistency even if there’s a failure (based on replaying the channel data), but with a slightly more complex serialisation approach. We’ll use the asynchronous sink in this example, and assuming you’ve already got the source configuration file set-up (see the previous blog post on Flume and HDFS for an example), the target Flume conf file in our case looked like this:

## TARGET AGENT ##
## configuration file location:  /etc/flume-ng/conf
## START Agent: flume-ng agent -c conf -f /etc/flume-ng/conf/flume-trg-agent-hbase.conf -n collector

#http://flume.apache.org/FlumeUserGuide.html#avro-source
collector.sources = AvroIn
collector.sources.AvroIn.type = avro
collector.sources.AvroIn.bind = 0.0.0.0
collector.sources.AvroIn.port = 4545
collector.sources.AvroIn.channels = mc1 mc2 mc3

## Channels ##
## Source writes to 3 channels, one for each sink
collector.channels = mc1 mc2 mc3

#http://flume.apache.org/FlumeUserGuide.html#memory-channel

collector.channels.mc1.type = memory
collector.channels.mc1.capacity = 1000

collector.channels.mc2.type = memory
collector.channels.mc2.capacity = 1000

collector.channels.mc3.type = memory
collector.channels.mc3.capacity = 1000

## Sinks ##
collector.sinks = LocalOut HadoopOut HbaseOut

## Write copy to Local Filesystem 
#http://flume.apache.org/FlumeUserGuide.html#file-roll-sink
collector.sinks.LocalOut.type = file_roll
collector.sinks.LocalOut.sink.directory = /var/log/flume-ng
collector.sinks.LocalOut.sink.rollInterval = 0
collector.sinks.LocalOut.channel = mc1

## Write to HDFS
#http://flume.apache.org/FlumeUserGuide.html#hdfs-sink
collector.sinks.HadoopOut.type = hdfs
collector.sinks.HadoopOut.channel = mc2
collector.sinks.HadoopOut.hdfs.path = /user/root/flume-channel/%{log_type}/%d%m%Y
collector.sinks.HadoopOut.hdfs.fileType = DataStream
collector.sinks.HadoopOut.hdfs.writeFormat = Text
collector.sinks.HadoopOut.hdfs.rollSize = 0
collector.sinks.HadoopOut.hdfs.rollCount = 10000
collector.sinks.HadoopOut.hdfs.rollInterval = 600

###############################################################
# HBase sink config 
###############################################################
collector.sinks.HbaseOut.type = org.apache.flume.sink.hbase.AsyncHBaseSink
collector.sinks.HbaseOut.channel = mc3
collector.sinks.HbaseOut.table = apache_access_log
collector.sinks.HbaseOut.columnFamily = common
collector.sinks.HbaseOut.batchSize = 5000
collector.sinks.HbaseOut.serializer = com.hbase.log.util.AsyncHbaseLogEventSerializer
collector.sinks.HbaseOut.serializer.columns = common:rowKey,common:hostname,common:remotehost,common:remoteuser,common:eventtimestamp,http:requestmethod,http:requeststatus,http:responsebytes,misc:referrer,misc:agent

A few points to note:

  • The collector.sinks.HbaseOut.type setting determines the sink type we’ll use, in this case org.apache.flume.sink.hbase.AsyncHBaseSink
  • collector.sinks.HbaseOut.table sets the HBase table name we’ll load, “apache_access_log”
  • collector.sinks.HbaseOut.serializer.columns actually defines the column qualifiers, in this case mapping incoming serialised log file rows into a set of HBase column families and qualifiers
  • collector.sinks.HbaseOut.serializer is the most important bit - and tells HBase how to turn the incoming Flume data into HBase loads, through a Java program called the “serializer”.

And its this serializer, the Java program that does the actual loading of the HBase table, that’s the final piece of the jigsaw. There are standard templates to use when writing this piece of code, and in our case the serializer looked like this:

package com.hbase.log.util;

import java.util.ArrayList;
import java.util.List;
import java.util.regex.*;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
import org.hbase.async.AtomicIncrementRequest;
import org.hbase.async.PutRequest;
import org.apache.flume.conf.ComponentConfiguration;
import org.apache.flume.sink.hbase.SimpleHbaseEventSerializer.KeyType;
import org.apache.flume.sink.hbase.AsyncHbaseEventSerializer;

import com.google.common.base.Charsets;
/**
 * A serializer for the AsyncHBaseSink, which splits the event body into
 * multiple columns and inserts them into a row whose key is available in
 * the headers
 *
 * Originally from https://blogs.apache.org/flume/entry/streaming_data_into_apache_hbase
 */
public class AsyncHbaseLogEventSerializer implements AsyncHbaseEventSerializer 
{
    private byte[] table;
    private byte[] colFam;
    private Event currentEvent;
    private byte[][] columnNames;
    private final List<PutRequest> puts = new ArrayList<PutRequest>();
    private final List<AtomicIncrementRequest> incs = new ArrayList<AtomicIncrementRequest>();
    private byte[] currentRowKey;
    private final byte[] eventCountCol = "eventCount".getBytes();
    // private String delim;

    @Override
    public void initialize(byte[] table, byte[] cf) 
    {
        this.table = table;
        this.colFam = cf;
    }

    @Override
    public void setEvent(Event event) 
    {
        // Set the event and verify that the rowKey is not present
        this.currentEvent = event;
        String rowKeyStr = currentEvent.getHeaders().get("rowKey");
        //if (rowKeyStr == null) {
        //  throw new FlumeException("No row key found in headers!");
        //}
        //currentRowKey = rowKeyStr.getBytes();
    }

    public String[] logTokenize(String event)
    {

        String logEntryPattern = "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) (\\d+) \"([^\"]+)\" \"([^\"]+)\"";
        Pattern p = Pattern.compile(logEntryPattern);
        Matcher matcher = p.matcher(event);

        if (!matcher.matches()) 
        {
            System.err.println("Bad log entry (or problem with RE?):");
            System.err.println(event);
            return null;
        }

        String[] columns = new String[matcher.groupCount()+1];
        
        columns[0]= Long.toString(System.currentTimeMillis());
        
        for (int i = 1; i <= matcher.groupCount(); i++) 
        {
            columns[i] = matcher.group(i);
        }

        return columns;

    }

    @Override
    public List<PutRequest> getActions() 
    {
        // Split the event body and get the values for the columns
        String eventStr = new String(currentEvent.getBody());
        long unixTime = System.currentTimeMillis();
        //String[] cols = eventStr.split(",");
        //String[] cols = eventStr.split(regEx);
        //String[] cols = eventStr.split("\\s+");
        //String[] cols = eventStr.split("\\t");
        //String[] cols = eventStr.split(delim);
        String[] cols = logTokenize(eventStr);
        puts.clear();
        String[] columnFamilyName;
        byte[] bCol;
        byte[] bFam;
        for (int i = 0; i < cols.length; i++) 
        {
            //Generate a PutRequest for each column.
            columnFamilyName = new String(columnNames[i]).split(":");
            bFam = columnFamilyName[0].getBytes();
            bCol = columnFamilyName[1].getBytes();

            if (i == 0) 
            {
                currentRowKey = cols[i].getBytes();
            }
            //PutRequest req = new PutRequest(table, currentRowKey, colFam,
            //columnNames[i], cols[i].getBytes());
            PutRequest req = new PutRequest(table, currentRowKey, bFam,
            bCol, cols[i].getBytes());
            puts.add(req);
        }
        return puts;
    }

    @Override
    public List<AtomicIncrementRequest> getIncrements() 
    {
        incs.clear();
        //Increment the number of events received
        incs.add(new AtomicIncrementRequest(table, "totalEvents".getBytes(), colFam, eventCountCol));
        return incs;
    }

    @Override
    public void cleanUp() 
    {
        table = null;
        colFam = null;
        currentEvent = null;
        columnNames = null;
        currentRowKey = null;
    }

    @Override
    public void configure(Context context) 
    {
        //Get the column names from the configuration
        String cols = new String(context.getString("columns"));
        String[] names = cols.split(",");
        columnNames = new byte[names.length][];
        int i = 0;
        
        for(String name : names) 
        {
            columnNames[i++] = name.getBytes();
        }
        
        //delim = new String(context.getString("delimiter"));
    }

    @Override
    public void configure(ComponentConfiguration conf) {}
}

HBase, rather than supporting the regular SELECT and INSERTS we’re used to with Oracle, instead uses “get" and “put” methods to retrieve, and store, data - along with “delete” and “scan”. The regular synchronous HBase sync uses these methods directly, taking data off the Flume channel and inserting it into the HBase table (or indeed, updating existing rows based on the row key), whilst the asychnronous method uses a layer in-between the incoming data and the write, allowing data (or “events”) to continue streaming in even if all the downstream data hasn’t get been committed. It’s this code though that maps each incoming bit of data - in this case, a parsed log file - to column families and qualifiers in the HBase table, and you’d need to write new code like this, or amend the exiting one, if you wanted to load other HBase tables in your Hadoop cluster - a long way from the point-and-click ETL approach we get with ODI, but a lot more flexible too (if that’s what you want).

Then it’s a case of compiling the Java code, like this:

mkdir com; mkdir com/hbase; mkdir com/hbase/log; mkdir com/hbase/log/util
vi com/hbase/log/util/AsyncHbaseLogEventSerializer.java
export CLASSPATH=/usr/lib/flume-ng/lib/*
javac com/hbase/log/util/AsyncHbaseLogEventSerializer.java
jar cf LogEventUtil.jar com
jar tf LogEventUtil.jar com
chmod 775 LogEventUtil.jar
cp LogEventUtil.jar /usr/lib/flume-ng/lib

Next, we had to run the following command before enabling Flume with this setup, because of an issue we found with Zookeeper stopping Flume working in this setup:

mv /etc/zookeeper/conf/zoo.cfg /etc/zookeeper/conf/zoo.cfg-unused

and finally, we start up the Flume target server agent, followed by the source one (again see the previous article for setting up the source Flume agent):

flume-ng agent -c conf -f /etc/flume-ng/conf/flume-trg-agent-hbase.conf -n collector

Then, after a while, log data starts getting loaded into the HBase table. You can check on it using Hue, and the HBase Browser:

NewImage

Or you can go back into the HBase shell and run the scan command to view the data, with each row representing a cell in the overall table storage:

hbase(main):001:0> scan 'apache_access_log'
ROW                   COLUMN+CELL                                               
 1400628560331        column=common:eventtimestamp, timestamp=1400628560350, val
                      ue=20/May/2014:15:28:06 +0000                             
 1400628560331        column=common:hostname, timestamp=1400628560335, value=89.
                      154.89.101                                                
 1400628560331        column=common:remotehost, timestamp=1400628560336, value=-
 1400628560331        column=common:remoteuser, timestamp=1400628560338, value=-
 1400628560331        column=common:rowKey, timestamp=1400628560333, value=14006
                      28560331                                                  
 1400628560331        column=http:requestmethod, timestamp=1400628560352, value=
                      GET / HTTP/1.1                                            
 1400628560331        column=http:requeststatus, timestamp=1400628560356, value=
                      200                                                       
 1400628560331        column=http:responsebytes, timestamp=1400628560358, value=
                      9054                                                      
 1400628560331        column=misc:agent, timestamp=1400628560377, value=Mozilla/
                      5.0 (Macintosh; Intel Mac OS X 10_9_2) AppleWebKit/537.75.
                      14 (KHTML, like Gecko) Version/7.0.3 Safari/537.75.14     
 1400628560331        column=misc:referrer, timestamp=1400628560359, value=-    
 1400628560344        column=common:eventtimestamp, timestamp=1400628560383, val
                      ue=20/May/2014:15:28:06 +0000  

This is all great, and a good starting point if you plan to process your data with other Java programs as the next step. But what if you want to view the data in a more convenient way, perhaps as a regular table? To do that you can use Hive again, this time using Hive’s HBase integration features to tell it the data is stored in HBase format, and to let it know how to display the various HBase column families and qualifiers. In our case, the DDL to create the corresponding Hive table looks like this:

DROP TABLE IF EXISTS hive_apache_access_log;
CREATE EXTERNAL TABLE hive_apache_access_log
(
unixtimestamp string,
eventtimestamp string,
hostname string,
remotehost string,
remoteuser string,
requestmethod string,
requeststatus string,
responsebytes string,
agent string,
referrer string
)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ('hbase.columns.mapping' = ':key,common:eventtimestamp,common:hostname,common:remotehost,common:remoteuser,http:requestmethod,http:requeststatus,http:responsebytes,misc:agent,misc:referrer')
TBLPROPERTIES ('hbase.table.name' = 'apache_access_log');

giving us the ability, either from the Hive shell like this, or from tools like OBIEE and ODI, to query the NoSQL database and brings its data into more regular, relational data stores.

hive> select * from hive_apache_access_log;
OK
1400628560331   20/May/2014:15:28:06 +0000  89.154.89.101   -   -   GET / HTTP/1.1  200 9054    Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_2) AppleWebKit/537.75.14 (KHTML, like Gecko) Version/7.0.3 Safari/537.75.14 -
1400628560344   20/May/2014:15:28:06 +0000  89.154.89.101   -   -   GET /wp-content/plugins/crayon-syntax-highlighter/css/min/crayon.min.css?ver=2.5.0 HTTP/1.1 304 -   Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_2) AppleWebKit/537.75.14 (KHTML, like Gecko) Version/7.0.3 Safari/537.75.14 http://www.rittmanmead.com/
1400628560345   20/May/2014:15:28:06 +0000  89.154.89.101   -   -   GET /wp-content/plugins/jetpack/modules/widgets/widgets.css?ver=20121003 HTTP/1.304 -   Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_2) AppleWebKit/537.75.14 (KHTML, like Gecko) Version/7.0.3 Safari/537.75.14 http://www.rittmanmead.com/
...

We’ll be covering more on HBase, and Oracle’s NoSQL Database, in future articles on the blog.