Simple Data Manipulation and Reporting using Hive, Impala and CDH5

Althought I’m pretty clued-up on OBIEE, ODI, Oracle Database and so on, I’m relatively new to the worlds of Hadoop and Big Data, so most evenings and weekends I play around with Hadoop clusters on my home VMWare ESXi rig and try and get some experience that might then come in useful on customer projects. A few months ago I went through an example of loading-up flight delays data into Cloudera CDH4 and then analysing it using Hive and Impala, but realistically it’s unlikely the data you’ll analyse in Hadoop will come in such convenient, tabular form. Something that’s more realistic is analysing log files from web servers or other high-volume, semi-structured sources, so I asked Robin to download the most recent set of Apache log files from our website, and I thought I’d have a go at analysing them using Pig and Hive, and maybe the visualise the output using OBIEE (if possible, later on).

As I said, I’m not an expert in Hadoop and the Cloudera platform, so I thought it’d be interesting to describe the journey I went through, and also give some observations from myself on when to use Hive and when to use Pig; when products like Cloudera Impala could be useful, and also the general state-of-play with the Cloudera Hadoop platform. So the files I started off with were Apache weblog files, with 10 in total and sizes ranging from 350MB to around 2MB.

NewImage

Looking inside one of the log files, they’re in the standard Apache log file format (or “combined log format”), where the visitor’s IP address is recorded, the date of access, some other information and the page (or resource) they requested:

NewImage

What I’m looking to do is count the number of visitors on a day, which was the most popular page, what time of day are we most busy, and so on. I’ve got a Cloudera Hadoop CDH5.0 6-node cluster running on a VMWare ESXi server at home, so the first thing to do is log into Hue, the web-based developer admin tool that comes with CDH5, and upload the files to a directory on HDFS (Hadoop Distributed File System), the Unix-like clustered file system that underpins most of Hadoop.

NewImage

You can, of course, SFTP the files to one of the Hadoop nodes and use the “hadoop fs” command-line tool to copy the files into HDFS, but for relatively small files like these it’s easier to use the web interface to upload them from your workstation. Once I’ve done that, I can then view the log files in the HDFS directory, just as if they were sitting on a regular Unix filesystem.

NewImage

At this point though, the files are still “unstructured’ - just a single log entry per line - and I’ll therefore need to do something before I can count things like number of hits per day, what pages were requested and so on. At this beginners level, there’s two main options you can use - Hive, a SQL interface over HDFS that lets you select from, and do set-based transformations with, files of data; or Pig, a more procedural language that lets you manipulate file contents as a series of step-by-step tasks. For someone like myself with a relational data warehousing background, Hive is probably easier to work with but it comes with some quite significant limitations compared to a database like Oracle - we’ll see more on this later.

Whilst Hive tables are, at the most simplest level, mapped onto comma or otherwise-delimted files, another neat feature in Hive is that you can use what’s called a “SerDe”, or “Serializer-Deserializer”, to map more complex file structures into regular table columns. In the Hive DDL script below, I use this SerDe feature to have a regular expression parse the log file into columns, with the data source being an entire directory of files, not just a single one:

CREATE EXTERNAL TABLE apachelog (
  host STRING,
  identity STRING,
  user STRING,
  time STRING,
  request STRING,
  status STRING,
  size STRING,
  referer STRING,
  agent STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.RegexSerDe'
WITH SERDEPROPERTIES (
  "input.regex" = "([^ ]*) ([^ ]*) ([^ ]*) (-|\\[[^\\]]*\\]) ([^ \"]*|\"[^\"]*\") (-|[0-9]*) (-|[0-9]*)(?: ([^ \"]*|\"[^\"]*\") ([^ \"]*|\"[^\"]*\"))?",
  "output.format.string" = "%1$s %2$s %3$s %4$s %5$s %6$s %7$s %8$s %9$s"
)
STORED AS TEXTFILE
LOCATION '/user/root/logs';

Things to note in the above DDL are:

  • EXTERNAL table means that the datafile used to populate the Hive table sits somewhere outside Hive’s usual /user/hive/warehouse directory, in this case in the /user/root/logs HDFS directory.
  • ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.RegexSerDe’ tells Hive to use the Regular Expressions Serializer-Deserializer to interpret the source file contents, and 
  • WITH SERDEPROPERTIES … gives the SerDe the regular expression to use, in this case to decode the Apache log format.

Probably the easiest way to run the Hive DDL command to create the table is to use the Hive query editor in Hue, but there’s a couple of things you’ll need to do before this particular command will work:

  1. You’ll need to get hold of the JAR file in the Hadoop install that provides this SerDE (hive-contrib-0.12.0-cdh5.0.0.jar) and then copy it to somewhere on your HDFS file system, for example /user/root. In my CDH5 installation, this file was at opt/cloudera/parcels/CDH/lib/hive/lib/, but it’ll probably be at /usr/lib/hive/lib if you installed CDH5 using the traditional packages (rather than parcels) route. Also if you’re using a version of CDH prior to 5, the filename will be renamed accordingly. This JAR file then needs to accessible to Hive, and whilst there’s various more-permanent ways you can do this, the easiest is to point to the JAR file in an entry in the query editor File Resources section as shown below.

  2. Whilst you’re there, un-check the “Enable Parameterization” checkbox, otherwise the query editor will interpret the SerDe output string as parameter references.

NewImage

Once the command has completed, you can click over to the Hive Metastore table browser, and see the columns in the new table.

NewImage

Behind the scenes, Hive maps its table structure onto all the files in the /user/root/logs HDFS directory, and when I run a SELECT statement against it, for example to do a simple row count, MapReduce mappers, shufflers and sorters are spun-up to return the count of rows to me.

NewImage

But in its current form, this table still isn’t all that useful - I’ve just got raw IP addresses for page requesters, and the request date is a format that’s not easy to work with. So let’s do some further manipulation, creating another table that splits out the request date into year, month, day and time, using Hive’s CREATE TABLE AS SELECT command to transform and then load in one command:

CREATE TABLE apachelog_date_split_parquet
ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe'
  STORED AS 
    INPUTFORMAT "parquet.hive.DeprecatedParquetInputFormat"
    OUTPUTFORMAT "parquet.hive.DeprecatedParquetOutputFormat"
AS
SELECT host,
       identity,
       user,
       substr(time,9,4)  year,
       substr(time,5,3)  month,
       substr(time,2,2)  day,
       substr(time,14,2) hours,
       substr(time,17,2) secs,
       substr(time,20,2) mins,
       request,
       status,
       size,
       referer,
       agent
FROM   apachelog
; 

Note the ParquetHive SerDe I’m using in this table’s row format definition - Parquet is a compressed, column-store file format developed by Cloudera originally for Impala (more on that in a moment), that from CDH4.6 is also available for Hive and Pig. By using Parquet, we potentially take advantage of speed and space-saving advantages compared to regular files, so let’s use that feature now and see where it takes us. After creating the new Hive table, I can then run a quick query to count web server hits per month:

NewImage

So - getting more useful, but it’d be even nicer if I could map the IP addresses to actual countries, so I can see how many hits came from the UK, how many from the US, and so on. To do this, I’d need to use a lookup service or table to map my IP addresses to countries or cities, and one commonly-used such service is the free GeoIP database provided by MaxMind, where you turn your IP address into an integer via a formula, and then do a BETWEEN to locate that IP within ranges defined within the database. How best to do this though?

There’s several ways that you can enhance and manipulate data in your Hadoop system like this. One way, and something I plan to look at on this blog later in this series, is to use Pig, potentially with a call-out to Perl or Python to do the lookup on a row-by-row (or tuple-by-tuple) basis - this blog article on the Cloudera site goes through a nice example. Another way, and again something I plan to cover in this series on the blog, is to use something called “Hadoop Streaming” - the ability within MapReduce to “subcontract” the map and reduce parts of the operation to external programs or scripts, in this case a Python script that again queries the MaxMind database to do the IP-to-country lookup.

But surely it’d be easiest to just calculate the IP address integer and just join my existing Hive table to this GeoIP lookup table, and do it that way? Let’s start by trying to do this, first by modifying my final table design to include the IP address integer calculation defined on the MaxMind website:

CREATE TABLE apachelog_date_ip_split_parquet
ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe'
  STORED AS 
    INPUTFORMAT "parquet.hive.DeprecatedParquetInputFormat"
    OUTPUTFORMAT "parquet.hive.DeprecatedParquetOutputFormat"
AS
SELECT host,
      (cast(split(host,'\\.')[0] as bigint) * 16777216) 
     + (cast(split(host,'\\.')[1] as bigint) * 65535) 
     + (cast(split(host,'\\.')[2] as bigint) * 256) 
     + (cast(split(host,'\\.')[3] as bigint)) ip_add_int,
       identity,
       user,
       substr(time,9,4)  year,
       substr(time,5,3)  month,
       substr(time,2,2)  day,
       substr(time,14,2) hours,
       substr(time,17,2) secs,
       substr(time,20,2) mins,
       request,
       status,
       size,
       referer,
       agent
FROM   apachelog
;
 

Now I can query this from the Hive query editor, and I can see the IP address integer calculations that I can then use to match to the GeoIP IP address ranges.

NewImage

I then upload the IP Address to Countries CSV file from the MaxMind site to HDFS, and define a Hive table over it like this:

create external table geo_lookup (
  ip_start      string,
  ip_end        string,
  ip_int_start  int,
  ip_int_end    int,
  country_code  string,
  country_name  string
  )
row format DELIMITED 
FIELDS TERMINATED BY '|' 
LOCATION '/user/root/lookups/geo_ip';

Then I try some variations on the BETWEEN clause, in a SELECT with a join:

select a.host, l.country_name
from apachelog_date_ip_split a join geo_lookup l 
on (a.ip_add_int > l.ip_int_start) and (a.ip_add_int < l.ip_int_end)
group by a.host, l.country_name;
select a.host, l.country_name
from apachelog_date_ip_split_parquet a join geo_lookup l 
on a.ip_add_int between l.ip_int_start and l.ip_int_end;

.. which all fail, because Hive only supports equi-joins. One option is to use a Hive UDF (user-defined function) such as this one here to implement a GeoIP lookup, but something that’s probably a bit more promising is to switch over to Impala, which has the ability to do non-equality joins through the crossjoin feature (Hive can in fact also use cross-joins, but they’re not very efficient). Impala also has the benefit of being much faster for BI-type queries than Hive, and it’s also designed to work with Parquet, so let’s switch over to the Impala query editor, run the “invalidate metadata” command to re-sync it’s table view with Hive’s table metastore, and then try the join in there:

NewImage

Not bad. Of course this is all fairly simple stuff, and we’re still largely working with relational-style set-based transformations. In the next two posts in the series though I want get a bit more deep into Hadoop-style transformations - first by using a feature called “Hadoop Streaming” to process data on its way into Hadoop, done in parallel, by calling out to Python and Perl scripts; and then take a look at Pig, the more “procedural” alternative to Hive - with the objective being to enhance this current dataset to bring in details of the pages being requested, filter out the non-page requests, and do some work with authors, tag and clickstream analysis.