End-to-End ODI12c ETL on Oracle Big Data Appliance Pt.4 : Transforming Data using Python & Hive Streaming

June 12th, 2014 by

This week I’m taking an in-depth look at ETL on the Oracle Big Data Appliance, using Oracle Data Integrator 12c to call the various bits of Hadoop functionality and orchestrate the whole process. So far, I’ve landed web log data into the Hadoop cluster using Flume, created a Hive table over the log data using Hive and the RegEx SerDe, then used further Hive transformations to join this data to other reference data, some of which came from an Oracle database via Sqoop. Here’s a complete listing of the posts so far, and the ones to come:

Our next transformation is a bit trickier though. The data we’re bringing in from our webserver has IP addresses recorded for each HTTP access, and what we want to do is use that IP address to identify the country that the website visitor was located in. Geocoding, as it’s called, is fairly easy to do if your data is stored in a database such as Oracle Database or you’re using a language like Python, with two steps needed to retrieve the IP address’s country:

1. First convert the IP address into a single integer, like this:

2. Then, using a freely-downloadable database from MaxMind, look to see which range of numbers your converted IP address is in:

The only problem is that Hive doesn’t support anything other than equi-joins. In a previous blog post I got around this issue by using Impala rather than Hive, and in another one I used Pig, and a custom Python UDF, to do the geocoding outside of the main Pig data flow. I could use Pig and a Python UDF in this situation, creating an ODI Procedure to call Pig from the command-line, but instead I’ll use another ODI KM, IKM Hive Transform, to call a Python script but entirely controlled from within ODI.

To set this geocoding process up, I first create a new ODI mapping, with my current Hive table as the source, and another Hive table this time with an additional column for the IP address country, as the target. I then map what columns I can into the target, leaving just the country column without a mapping. Notice I’ve put the source table within an ODI12c dataset – this isn’t mandatory, it’s just something I decided to do at the time – it doesn’t impact on the rest of the process.

NewImage

Now in a similar way to how I created the Python UDF to do the country lookup for Pig, I’ll create a variation of that script that expects columns of data as an input, outputs other columns of data, and does the geocoding within the script by calling the Python GeoIP API provided by MaxMind. Most importantly though, I’ll use it in conjunction with IKM Hive Transform’s ability to stream it’s Hive data through an arbitrary script, making use of the Hive TRANSFORM function and a feature called “hive streaming”. To keep things simple let’s first set it up outside of the KM, see how it works, and then configure the KM to use this python script as its transformation function.

Let’s start by looking at the source Hive table I’ll be starting with:

The target table is exactly the same, except for the additional column for country:

The way IKM Hive Transform works is by streaming each incoming row of Hive data to the script registered with it, with each row’s columns tab-separated. In-turn, it expects the script to output similar rows of tab-separated values, each line terminated with a newline. My script, therefore, needs to accept an arbitrary number of incoming rows, parse each row into its constituent columns, call the MaxMind API to do the geocoding, and then output a tab-separated set of columns for each row it processes (a bit like pipelined table functions in Oracle PL/SQL).

So here’s the Python script that does this:

A few things to note about the script:

1. “Import sys” brings in the Python libraries that then allows me to read the input data, via sys.stdin.
2. I’ve previously installed the MaxMind GeoIP API, database and libraries, with the GeoIP.dat database stored alongside the script in /tmp. Import pygeoip brings in the Python library to use the API.
3. The bit that does the geocoding is “gi.country_name_by_addr”, which calls the MaxMind API. It’s this bit that replaces the need for a join with a BETWEEN clause in HiveQL.
4. To output data back to the calling Hive transformation, I just “print” it, using “\t” for tab.

One thing we’ll need to do though, similar to how we had to copy the JAR file used by Hive for the RegEx SerDe around all the nodes in the cluster, Is copy the GeoIP.dat file used by the MaxMind geocoding API to the same place on all the cluster nodes. We’ll also have to set the permissions on this file so it’s readable and writeable by the Hive/YARN process on each Hadoop node:

I also need to make sure the MaxMind geocoding Python API is installed into each node too:

Now I can run a HiveQL command and test out the script. I’ll start up a Hive shell session, and the first thing I need to do is register the script with Hive, distributing across the cluster and putting it in cache. Then I can select against the source table, using the TRANSFORM USING clause:

So it looks like it’s working. Let’s move back to ODI now and reference the script within the IKM Hive Transform KM settings. Note that I enter the name and filesystem location of the script in the KM settings but I don’t use the TRANSFORM_SCRIPT option to directly key the script contents into the KM; this should work, and if you do so the KM will write the script contents out to the agent’s host filesystem at the start of the process, but the KM in this ODI12c release has an issue parsing these keyed-in scripts, so you’re best leaving this setting empty and the KM will just use the file already in the filesystem.

NewImage

After running the mapping, I then go over to the Operator navigator and see that it’s run successfully. Looking at the code the KM generates, I can see it referencing our script in the HiveQL statement, with the script outputting the extra country column that’s then loaded into the target table along with the other columns.

NewImage

Then finally, checking the target table, I can see that each row in my final Hive table has the country name alongside each log file entry, along with the IP address and other details.

So – now we’re at the stage where we’ve finished the processing with the Hadoop cluster, and I’d like to copy the final set of data into an Oracle database, so that I can analyse it using Oracle SQL and other tools. In the final installment in this series we’ll do just that, using IKM File/Hive to Oracle (OLH/ODCH) and Oracle Loader for Hadoop, one of the Oracle Big Data Connectors.

Comments are closed.

Website Design & Build: tymedia.co.uk