Simple Hadoop Dataflows using Apache Pig and CDH4.6

The other day I took some logs from the Apache webserver that runs the Rittman Mead website, and analysed them using Hadoop CDH5, Apache Hive and Impala to get some basic metrics on number of hits per month, where the hits came from and so on. Hive and Impala are great for analysing data sitting on HDFS on a Hadoop cluster, but like SQL compared to PL/SQL or C++, everything you do is declarative and set-based whereas sometimes, you want to build up your dataset using a dataflow-type approach, particularly if you’ve come from a programming vs. a data warehousing background.

If you’ve been looking at Hadoop for a while, you’ll probably therefore know there’s another basic high-level-language approach to querying Hadoop data to accompany Hive, and it’s called “Pig”. Pig, like Hive, is an Apache project and provides an engine for creating and executing data flows, in parallel, on Hadoop. Like Hive, jobs you create in Pig eventually translate into MapReduce jobs (with the advantages and disadvantages that this brings), and has concepts that are similar – but just that little bit different – to relational flows such as filters, joins and sorts.

It’s often called a “procedural” language (as opposed to Hive’s declarative language), but really it’s not – it’s a “data flow language” that has you specifically set out the data flow as the main part of a Pig program, rather than it being a by-product of the if/then/elses and control structures of a procedural language. For people like me that comes from an Oracle data warehousing background, in most cases we’d feel more comfortable using Hive’s set-based transformations to do our data loading and transformation on Hadoop, but in some cases – particularly when you’re querying data interactively, building up a data pipeline and working with nested data sets – it can be more appropriate.

Connecting to the Pig Console, and Pig Execution Options

Iteratively examining and analysing data from webserver log files is a great example of where Pig could be useful, as you naturally hone-down and pivot the data as you’re looking at it, and in-effect you’re looking to create a data pipeline from the raw logs through to whatever summary tables or files you’re looking to create. So let’s go back to the same input log files I used in the previous post on Hive and Impala, and this time bring them into Pig. 

Within CDH (Cloudera Distribution including Hadoop) you can run Pig scripts either interactively from the Pig command-line shell, called “Grunt”, or you can submit them as workflow jobs using the Hue web interface and the Oozie workflow scheduler; the advantage when you’re starting to working with the interactive Grunt shell is that you can run your commands one-by-one and examine the metadata structures that you create along the way, so let’s use that approach first and move onto batch scheduling later on.

I’ll start by SSH’ing into one of the CDH4.6 nodes and starting the Grunt shell:

Even from within the Grunt shell, there’s two ways I can then run Pig. The default way is to have Grunt run your Pig commands as you’d expect, converting them in the end to MapReduce jobs which then run on your Hadoop cluster. Or, you can run in “local mode”, which again uses MapReduce but only runs on the machine you’re logged in to and only single-threaded, but can often be faster for when you’re just playing around with a local dataset and you want to see results fast (you can turn on local mode by adding an ‘-x local’ flag when starting Grunt). In my example, I’m going to run Grunt in regular MapReduce mode though anyway.

Loading and Parsing the Weblog Files

I then define my first pig relation, analogous to a relational table and technically, a named Pig “bag”, like this:

Compared to the Pig table DDL script in the previous article example I posted, we declare the incoming dataset much more programmatically – the first row of the script creates a relation called “raw_logs”, analogous to a table in Hive, and declares it as having a single column (“line:array”) that maps onto a directory of files in HDFS (“/user/root/logs”). You can ask Pig (through the Pig command-line client, which I’m using now) to list-out the structure of this relation using the “describe” command:

In this form the logs aren’t too useful though as each row contains all the data we want, as a single field. To take a look at what we’re working with currently, let’s create another relation that limits down the dataset to just five rows, and use the DUMP command to display the relation’s data on the screen:

What I’ve omitted for clarity in the above output is the MapReduce console output – what you’ll see if you run this in MapReduce mode is the process starting up, and then running, to retrieve 5 rows effectively at random from the whole set of log files, process them through the Map > Shuffle > Reduce process and then return them to the Grunt shell.

What would be really good though, of course, is if we could split these single log row columns into multiple ones, one for each part of the log entry. In the Hive example I posted the other day, I did this through a Hive “SerDe” that used a regular expression to split the file, and I can do something similar in Pig; Pig has a function called REGEX_EXTRACT_ALL that takes a regular expression and creates a column for each part of the expression, and so I can use it in conjunction with another relational operator, GENERATE FLATTEN, to take the first set of data, run it through the regular expression and come out with another set of data that’s been split as I want it:

GENERATE in Pig tells it to create (or “project”( some columns out of an incoming dataset; FLATTEN eliminates any nesting the resulting output (we’ll see more of FLATTEN and nesting in a moment). Notice how the DESCRIBE command afterwards now shows individual columns for the log elements, rather than just one single “line:chararray” column.

Using Pig to Interactively Filter the Dataset

So now we’ve got a more useful set of rows and columns in the Pig relation, and like an Oracle table, unless we do something to order them later, they’re effectively held in random order. Something we can do now is filter the dataset, for example creating another relation containing just those log entries where the request 404’d, and the further filter that dataset to those 404’d requests that were made by users using IE6:

So how many of our website users are on IE6 and getting page not available errors? To find out, I create another relation that groups the entries up in a single row, and then generates a count of those rows that were aggregated:

and I can do a similar thing all of the 404’s:

You can see these Pig scripts running in CDH’s Cloudera Manager web application, with the screenshot below showing one of them at the point where 92% of the Mapper parts have completed, waiting to hand-off to the Reducers; the console output in Grunt will show you the status too, the output of which I removed from the above two statements for clarity.


Grouping, Subsetting and Aggregating Data using Pig

How we generate counts and other aggregates is interesting in Pig. Pig has a relational operator called GROUP as we’ve seen before, and when you GROUP a relation by a column, or a group of columns, it creates a new relation that contains two columns; one called “group” that has the same datatype as whatever you grouped on (or a “tuple” made up of multiple columns, if you grouped-on more than one column), and a second column that’s named after whatever you grouped, i.e. the original relation. To take an example, if we grouped the logs_base relation on status code, you’d see the following if you then describe the resulting relation:

What’s interesting though about a pig GROUP, and conceptually different to SQL (and therefore Hive)’s GROUP BY, is that this second column is actually in Pig terms a “bag”, a bag of rows (or “tuples”) that are unaltered compared to the original relation, i.e. they’ve not been aggregated up by the grouping, but are still in their same detail-level. So Pig gives you, apart from its step-by-step data flow method of working with data, this ability to group data whilst still preserving the detail of the individual grouped rows, leaving any summation or other aggregation step to something you do afterwards. So for example, if I wanted to see how many 200s, 404’s and so on my log file dataset contained in total, I then tell Pig to iterate through these bags, project out the columns I’m interested in (in this case, just the status) and also perform aggregation over the grouping buckets specified in the GROUP relational operator:

So in that example, we told Pig to list out all of the groupings (i.e. the distinct list of status codes), and then run a count of rows against each of those groupings, giving us the output we’re interested in. We could, however, not aggregate those rows at this point though and instead treat each “bucket” formed by the grouping as a sub-selection, allowing us to, for example, investigate in more detail when and why the 301 errors – “Moved Permanently” – were caused. Let’s use that now to find out what the top 10 requests were that led to HTTP 301 errors, starting by creating another relation that just contains the ‘301’ group:

Looking at the structure of the relation this has created though, you can see that the rows we’ve grouped are all contained within a single tuple called “logs_base”, and to do anything interesting with that data we’ll need to flatten it, which takes that tuple and un-nests it:

Notice also how I referenced the two columns in the by_status_301 relation by positional notation ($0 and $1)? This is handy when either you’ve not got a proper schema defined for your data (all part of the “pigs eat anything” approach for Pig, in that it even handles data you don’t yet have a formal schema for), or when it’s just easier to refer to a column by position than work out it’s formal name.

So now we’ve got our list of log entries that have recorded HTTP 301 “permanently moved” error messages, let’s use another relation to project just the columns we want – the date and the requests – and also use some Pig string functions to extract the day, month and year along, and also split the request field up into its constituent method, URI and protocol fields:

All of these statements just set-up the data flow, and no actual processing takes place until we choose to dump, or store, the results of the data flow – which again makes Pig great for iteratively building-up a data flow, or in BI terms maybe an ETL flow, before finally pulling the trigger at the end and generating the end result. Let’s do that now using the dump command:

So we had around 85k “page permanently moved” errors in April, only a few in February, and a much larger amount in March 2014. So which web page requests in March 2014 were the biggest cause of this error? Let’s focus on just that month and list out the top ten page requests that hit this error:

Joining Datasets in Pig 

So far we’ve worked with just a single set of data – the Apache weblog files that we’ve then filtered, subsetted, parsed, analysed and so forth. But what would be really interesting though, would be if we can bring in some additional, reference or other lookup data to help us make more sense of the log activity on our website. One of the motivators for the people behind Pig, right at the start, was to give Hadoop the ability to join datasets, which up until then was really hard to do with just Java and MapReduce; as we’ll see later on there are still a lot of restrictions on how these joins take place, but Pig gives you the ability to join two or more datasets together, which we’ll do now in another example where we’ll look at the most popular blog posts, and blog authors, over the period covered by our logs.

Let’s start by taking the full set of logs, parsed into the separate elements of the log file entry, and add in additional columns for month and the request elements:

One thing you’re taught with Pig is “project early, and often”, so let’s remove the method and protocol columns from that dataset and then filter the remaining page requests to remove those that are blank or aren’t blog post requests:

Let’s now reduce that list down to the top ten page requests, the way we did before with pages causing 301 errors:

Not bad. What would be even better though, would be if I could retrieve the full names of these posts in WordPress, on which our website runs, and also the author name. I’ve got text file export file of post names, URLs and authors that’s been previously exported from our WordPress install, so let’s declare another relation to hold initially the raw rows from that file, like this:

Then split that file by the semicolon that delimits each of the entries (author, post name etc):

I’ll now take that relation and project just the columns I’m interested in:

Now I’ll do the join, and then take that join and use it to generate a combined list of pages and who wrote them:

and then finally, output the joined set of data to a comma-separated file in HDFS:

Once that’s run, I can use Grunt’s “cat” command to output the contents of the file I just created:

But What About More Complex Joins and Transformations … Enter, Pig Extensibility

This is of course great, but going back to my previous Hive example I also managed to geo-code the log file entries, converting the IP addresses into country names via a lookup to a geocoding database. What made that example “interesting” though was the need to join the Hive table of log posts to the geocode table via a BETWEEN, or > and < than operators, so that I could locate each IP address within the ranges given by the geocoding database – and the reason it got interesting was that Hive can only do equi-joins, not non-equijoins or joins involving greater than, BETWEEN and so on. Impala *could* do it, and on a small set of input rows – five in my example – it worked fine. Try and scale the Impala query up to the full dataset though, and the query fails, because it runs out of memory; and that’s potentially the issue with Impala, and set-based queries, as Impala does everything in-memory, and most Hadoop systems are designed for fast I/O, not lots of memory. 

So can Pig help here? Well, it’s actually got the same limitation – non-equijoins are actually quite difficult to do in Hadoop because of the way MapReduce works, but where Pig could perhaps help is through its extensibility – you can stream Pig data, for example IP addresses, through Perl and Python scripts to return the relevant country, or you can write Pig UDFs – User-Defined Functions – to return the information we need in a similar way to how PL/SQL functions in Oracle let you call-out to arbitrary functions to return the results of a more complex look-up. But this is also where things get a bit more complicated, so we’ll save this to the next post in this series, where I’ll also be joined by my colleague Nelio who’s spent the best part of this last week VPNd into my VMWare-based Hadoop cluster getting this last example working.