Going Beyond MapReduce for Hadoop ETL Pt.1 : Why MapReduce Is Only for Batch Processing

Over the previous few months I’ve been looking at the various ways you can load data into Hadoop, process it and then report on it using Oracle tools. We’ve looked at Apache Hive and how it provides a SQL layer over Hadoop, making it possible for tools like ODI and OBIEE to use their usual SQL set-based process approach to access Hadoop data; later on, we looked at another Hadoop tool, Apache Pig, which provides a more dataflow-type language over Hadoop for when you want to create step-by-step data pipelines for processing data. Under the covers, both Hive and Pig generate Java MapReduce code to actually move data around, with MapReduce then working hand-in-hand with the Hadoop framework to run your jobs in parallel across the cluster.

But MapReduce can be slow; it’s designed for very large datasets and batch processing, with overall analysis tasks broken-down into individual map and reduce tasks that start by reading data off disk, do their thing and then write the intermediate results back to disk again.

NewImage

Whilst this approach means the system is extremely fault-tolerant and effectively infinitely-scalable, this writing to disk of each step in the process means that MapReduce jobs typically take a long time to run and don’t really take advantage of the RAM that’s available in today’s commodity servers. Whilst this is a limitation most early adopters of Hadoop were happy to live with (in exchange for being able to analyse cheaply data on a scale previously unheard of), over the past few years as Hadoop adoption has broadened there’s been a number of initiatives to move Hadoop past it’s batch processing roots and into something more real-time that does more of its processing in-memory. Whilst there are whole bunch of projects and products out there that claim to improve the speed of Hadoop processing and bring in-memory capabilities - Apache Drill, Cloudera Impala, Oracle’s Big Data SQL are just some examples - the two that are probably of most interest to Hadoop customers working in an Oracle environment are called Apache Spark, and Apache Tez. But before we get into the details of Spark, Tez and how they improve over MapReduce, let’s take a look at why MapReduce can be slow.

MapReduce and Hadoop 1.0 - Scalable, Fault-Tolerant, but Aimed at Batch Processing

Going back to MapReduce and what’s now termed “Hadoop 1.0”, MapReduce works on the principle of breaking larger jobs down into lots of smaller ones, with each one running independently and persisting its results back to disk at the end to ensure data doesn’t get lost if a server node breaks down. To take an example, the Apache Pig script below reads in some webserver log files, parses and filters them, aggregates the data and then joins it to another Hadoop dataset before outputting the results to a directory in the HDFS storage layer:

register /opt/cloudera/parcels/CDH/lib/pig/piggybank.jar
raw_logs = LOAD '/user/mrittman/rm_logs' USING TextLoader AS (line:chararray);
logs_base = FOREACH raw_logs
GENERATE FLATTEN
  (REGEX_EXTRACT_ALL(line,'^(\\S+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] "(.+?)" (\\S+) (\\S+) "([^"]*)" "([^"]*)"')
)AS
  (remoteAddr: chararray, remoteLogname: chararray, user: chararray,time: chararray, request: chararray, status: chararray, bytes_string: chararray,referrer:chararray,browser: chararray);
logs_base_nobots = FILTER logs_base BY NOT (browser matches '.*(spider|robot|bot|slurp|bot|monitis|Baiduspider|AhrefsBot|EasouSpider|HTTrack|Uptime|FeedFetcher|dummy).*');
logs_base_page = FOREACH logs_base_nobots GENERATE SUBSTRING(time,0,2) as day, SUBSTRING(time,3,6) as month, SUBSTRING(time,7,11) as year, FLATTEN(STRSPLIT(request,' ',5)) AS (method:chararray, request_page:chararray, protocol:chararray), remoteAddr, status;
logs_base_page_cleaned = FILTER logs_base_page BY NOT (SUBSTRING(request_page,0,3) == '/wp' or request_page == '/' or SUBSTRING(request_page,0,7) == '/files/' or SUBSTRING(request_page,0,12) == '/favicon.ico');
logs_base_page_cleaned_by_page = GROUP logs_base_page_cleaned BY request_page;
page_count = FOREACH logs_base_page_cleaned_by_page GENERATE FLATTEN(group) as request_page, COUNT(logs_base_page_cleaned) as hits;
page_count_sorted = ORDER page_count BY hits DESC;
page_count_top_10 = LIMIT page_count_sorted 10;
posts = LOAD '/user/mrittman/posts.csv' USING org.apache.pig.piggybank.storage.CSVExcelStorage() as (post_id:int,title:chararray,post_date:chararray,post_type:chararray,author:chararray,url:chararray,generated_url:chararray);
posts_cleaned = FOREACH posts GENERATE CONCAT(generated_url,'/') as page_url,author as author, title as title;
pages_and_post_details = JOIN page_count by request_page, posts_cleaned by page_url;
pages_and_posts_trim = FOREACH pages_and_post_details GENERATE page_count::request_page as request_page, posts_cleaned::author as author, posts_cleaned::title as title, page_count::hits as hits;
pages_and_posts_sorted = ORDER pages_and_posts_trim BY hits DESC;
pages_and_post_top_10 = LIMIT pages_and_posts_sorted 10;
store pages_and_post_top_10 into 'top_10s/pages';

Pig works by defining what are called “relations” or “aliases”, similar to tables in SQL that contain data or pointers to data. You start by loading data into a relation from a file or other source, and then progressively define further relations take that initial dataset and apply filters, use transformations, re-orientate the data or join it to other relations until you’ve arrived at the final set of data you’re looking for. In this example we start with raw log data, parse it, filter out bit and spider activity, project just the columns we’re interested in and then remove further “noise” from the logs, then join it to reference data and finally return the top ten pages over that period based on total hits.

NewImage

Pig uses something called “lazy evaluation”, where relations you define don’t necessarily get created when they’re defined in the script; instead they’re used as a pointer to data and instructions on how to produce it if needed, with the Pig interpreter only materializing a dataset when it absolutely has to (for example, when you ask it to store a dataset on disk or output to console). Moreover, all the steps leading up to the final dataset you’ve requested are considered as a whole, giving Pig the ability to merge steps, miss out steps completely if they’re not actually needed to produce the final output, and otherwise optimize the flow data through the process.

Running the Pig script and then looking at the console from the script running the Grunt command-line interpreter, you can see that five separate MapReduce jobs were generated to load in the data, filter join and transform it, and then produce the output we requested at the end.

JobId                  Maps Reduces Alias                                                                                                               Feature Outputs
job_1417127396023_0145 12   2       logs_base,logs_base_nobots,logs_base_page,logs_base_page_cleaned,logs_base_page_cleaned_by_page,page_count,raw_logs GROUP_BY,COMBINER 
job_1417127396023_0146 2    1       pages_and_post_details,pages_and_posts_trim,posts,posts_cleaned                                                     HASH_JOIN 
job_1417127396023_0147 1    1       pages_and_posts_sorted                                                                                              SAMPLER 
job_1417127396023_0148 1    1       pages_and_posts_sorted                                                                                              ORDER_BY,COMBINER 
job_1417127396023_0149 1    1       pages_and_posts_sorted                                                                                              hdfs://bdanode1....pages2,

Pig generated five separate MapReduce jobs that loaded, parsed, filtered, aggregated and joined the datasets as part of an overall data “pipeline”, with the intermediate results staged to disk before the next MapReduce job took over. On my six-node CDH5.2 VM cluster it took just over five minutes to load, process and aggregate 5m records from our site’s webserver.

NewImage

Now the advantage of this approach is that its more or less infinitely scalable and certainly resilient, but whilst Pig can look at your overall dataflow “graph” and come up with an optimal efficient way to get to your end result, MapReduce treats every step as atomic and separate and insists on writing every intermediate step to disk before moving on.

What this means in-practice is that ETL routines that use Pig, Hive and MapReduce whilst scaling well, never really get to the point where you can run them as micro-batches or in real-time. For that type of scenario we need to look at moving away from MapReduce and breaking the link between Hadoop (the platform, the cluster management and resource handling part) and the processing that runs on it, so that we can run alternative execution engines on the Hadoop platform such as Apache Tez, which we’ll cover in tomorrow’s post.