Adding Geocoding Capabilities to Pig through Custom UDFs

In the previous two posts in this series, I’ve used Hive and Pig to process and analyse data from log files generated by the Rittman Mead Blog webserver. In the article on Hive, I created a relational table structure over a directory containing the log files, using a regular expression SerDe to split each log line up into its constituent parts (IP address, page requested, status code and so on). I then brought in another table of data, containing IP address ranges and countries they were assigned to, so that I could determine what parts of the world were accessing our site over time.

In the second example, I took the same log files but processed them this time using Apache Pig. I used Pig’s dataflow-style approach to loading and analysing data to progressively filter, pivot and analyse the log file dataset, and then joined it to an export of pages and authors from Wordpress, the CMS that we used to run the website, so I could determine who wrote the most popular blog articles over the period covered by the logs.

But the first example, where I joined the log file data to the geocoding table, had a bit of an issue that only came-up when I tested it with a larger set of data than I used at the end of that article. In the article example, I limited the amount of log file rows to just five, at the time to keep the example readable on the blog, but when I tried it later on with the full dataset, the query eventually failed with an out-of-memory error from the Hadoop cluster. Now in practice, I could probably have increased the memory (java heap space) or otherwise got the query through, but geo-tagging my data in this way - as a big table join, and using an in-memory database engine (Impala) to do it - probably isn’t the most sensible way to do a single value lookup as part of a Hadoop transformation - instead,  this is probably something better done through what’s called a “user-defined function”.

Both Hive and Pig support used-defined functions (UDFs), and a quick Google search brought up one for Hive called GeocodeIP, on Github, that looks like it might do the job. Sticking with Pig for the moment though, we thought this might be a good opportunity to see how UDFs for Pig are created, and so my colleague, Nelio Guimaraes, put together the following example to walk through how a typical one might be created. Before start though, a bit of background.

The problem we have is that we need to match an IP address in a webserver log file with an IP address range in a lookup table. For example, the IP address in the lookup table might be 124.232.100.105, any the lookup database would have an IP address range from, say, 124.232.100.0 to 124.232.100.255 which allocates to a particular country - Poland, for example. The lookup database itself comes from Maxmind, and there’s a formula they use to convert IP addresses to integers, like this:

NewImage

so that you can do a simple BETWEEN in an SQL join to locate the range that matches the incoming IP address.

NewImage

Except Pig, like Hive, can’t normally support non-equijoins, which leads us to UDFs and other approaches to getting the country for our IP address. Pig, again like Hive, is however extensible and its relatively easy to add Pig UDFs either yourself, or through UDF libraries like Pig’s Piggybank. The best language to write UDFs in is Java as it gives access to the largest amount of Pig native functionality (such as the ability to write custom data loaders and unloaders), but for what we’re doing Python will be fine, so let’s put one together in Python to do our IP address Geocoding and show how the process works.

Going back to the Pig scripts we put together yesterday, they started-off by declaring a relation that loaded the raw log files in from a directory in HDFS, and then used another relation to parse the first one via a regular expression, so we had each of the log file elements in its own column, like this:

raw_logs =LOAD '/user/root/logs/' USING TextLoader AS (line:chararray);
logs_base = FOREACH raw_logs
 GENERATE FLATTEN
 (
 REGEX_EXTRACT_ALL
 (
 line,
 '^(\\S+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] "(.+?)" (\\S+) (\\S+) "([^"]*)" "([^"]*)"'
 )
 )
 AS
 (
 remoteAddr: chararray, remoteLogname: chararray, user: chararray,
 time: chararray, request: chararray, status: int, bytes_string: chararray,
 eferrer: chararray, browser: chararray
 );

At this point we can run-off a list of the top 5 browser type based on page access, for example:

grunt> by_browser_group = GROUP logs_base BY browser;                              
grunt> by_browser_group_count = FOREACH by_browser_group  
>> GENERATE group, COUNT($1);
grunt> by_browser_group_sorted = ORDER by_browser_group_count BY $1 DESC;
grunt> by_browser_group_top_5 = LIMIT by_browser_group_sorted 5;
grunt> dump by_browser_group_top_5
...
(Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/33.0.1750.154 Safari/537.36,675161)
(-,394101)
(Mozilla/5.0 (compatible; monitis.com - free monitoring service; http://monitis.com),371078)
(Mozilla/5.0 (Windows NT 6.1; WOW64; rv:28.0) Gecko/20100101 Firefox/28.0,201861)
(Mozilla/5.0 (Windows NT 6.1; WOW64; rv:27.0) Gecko/20100101 Firefox/27.0,167851)

See the previous post, and this other blog article, for more background on how you do grouping and aggregating in Pig, if you’re not familiar with the syntax.

But what if we want to group by country? That’s where the geocoding comes in, and the Pig UDF we’re going to create. As we’re going to create this example using Python, we’ll be using the pygeoip Python API that you install through the pip, the Python package manager, and the GeoLite Country database (.dat) file from Maxmind, who make this basic version available for download free, then follow these steps to set the Python UDF up:

  1. On the master node on your Hadoop cluster (i’m using a three-node CDH4.6 cluster) where Pig and Hive run, install pip, and then download the pygeoip API, like this:
wget https://raw.github.com/pypa/pip/master/contrib/get-pip.py
python get-pip.py 
pip install pygeoip
  1. Copy the GeoIP.dat file to somewhere on the Hadoop master node, for example /home/nelio/. Make a note of the full path to the GeoIP.dat file, and then copy the file to the same location on all of the worker nodes - there’s probably a way to cache this or otherwise automatically distribute the file (suggestions welcome), but for now this will ensure that each worker node can get access to a copy of this file.

  2. Using a text editor, create the python script that will provide the Python UDF, like this, substituting the path to your GeoIP.dat file if it’s somewhere else. Once done, save the file as python_geoip.py to the same directory (/home/nelio, in this example) - note that this file only needs to go on the main, master node in your cluster, and Pig/MapReduce will distribute it to the worker nodes when we register it in our Pig script, later on.

#!/usr/bin/python

import sys
sys.path.append('/usr/lib/python2.6/site-packages/')
import pygeoip

@outputSchema("country:chararray")
def getCountry(ip):
    gi = pygeoip.GeoIP('/home/nelio/GeoIP.dat')
    country = gi.country_name_by_addr(ip) 
    return country

Note that the sys.path.append line in the script is so that Jython knows to look in the place were the new python module, pygeoip, when starting up.

  1. Let’s start another Pig session now and try and use this UDF. I exit back to the OS command prompt, and change directory to where I’ve stored the python file and the GeoIP.dat file, and start another Grunt shell session:
[root@cdh4-node1 ~]# cd /home/nelio
[root@cdh4-node1 nelio]# pig

Now if I want to use this Python Pig UDF in a Grunt shell session, I need to register it as an scripting UDF either at the Grunt shell or in a Pig script I run, so let’s start with that, and then bring in the log data as before:

grunt> register 'python_geoip.py' using jython as pythonGeoIP;
...
grunt> raw_logs =LOAD '/user/root/logs/' USING TextLoader AS (line:chararray);
grunt> logs_base = FOREACH raw_logs
>>  GENERATE FLATTEN
>>         (
>>                 REGEX_EXTRACT_ALL
>>                 (
>>                         line,
>>                         '^(\\S+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] "(.+?)" (\\S+) (\\S+) "([^"]*)" "([^"]*)"'
>>                 )
>>         )
>>         AS
>>         (
>>                 remoteAddr: chararray, remoteLogname: chararray, user: chararray,
>>                 time: chararray, request: chararray, status: int, bytes_string: chararray,
>>                 eferrer: chararray, browser: chararray
>>         ); 

I’ll now define a new relation (alias) projecting just the IP addresses from the combined log files, and then filter that into another alias so we only have valid IP addresses to work with:

grunt> ipaddress =  FOREACH logs_base GENERATE remoteAddr;
grunt> clean_ip  = FILTER ipaddress BY (remoteAddr matches '^.*?((?:\\d{1,3}\\.){3}\\d{1,3}).*?$');

Now we get to use the Python UDF. We’ll pass to it these IP addressees, with the UDF then returning the country that the IP address is located in, based on Maxmind’s IP address ranges.

grunt> country_by_ip = FOREACH clean_ip GENERATE pythonGeoIP.getCountry(remoteAddr);
2014-05-05 05:22:50,141 [MainThread] INFO  org.apache.pig.scripting.jython.JythonFunction - Schema 'country:chararray' defined for func getCountry
grunt> describe country_by_ip
country_by_ip: {country: chararray}

So this function will have converted all of the IP addresses in the logs to country names; let’s now group, count, order and select the top five from that list.

grunt> group_by_country = GROUP country_by_ip BY country;
grunt> count_by_country = FOREACH group_by_country GENERATE FLATTEN(group) as country, COUNT(country_by_ip) AS (hits_per_country:long);
grunt> order_by_access = ORDER count_by_country BY hits_per_country DESC;
grunt> top5_country = LIMIT order_by_access 5;

Of course all this has done so far is set-up a data-flow in Pig, telling it how to move the data through the pipeline and arrive at the final output I’m interested in; let’s now run the process by using the “dump” command:

grunt> dump top5_country
...
(United States,2012311)
(India,785585)
(United Kingdom,459422)
(Germany,231386)
(France,168319)

So that’s a simple example of a Pig UDF, in this instance written in Python. There’s other ways to extend Pig beyond UDFs - Pig Streaming is the obvious alternative, where the entire relation goes through the streaming interface to be processed and then output back into Pig, and hopefully we’ll cover this at some point in the future - or then again, maybe it’s now time to take a proper look at Spark.