Going Beyond MapReduce for Hadoop ETL Pt.3 : Introducing Apache Spark

December 9th, 2014 by

In the first two posts in this three part series on going beyond MapReduce for Hadoop ETL, we looked at why MapReduce and Hadoop 1.0 was only really suitable for batch processing, and how the new Apache Tez framework enabled by Apache YARN on the Hadoop 2.0 platform can be swapped-in for MapReduce to improve the performance of existing Pig and Hive scripts. Today though in the final post I want to take a look at Apache Spark, the next-gen compute framework that Cloudera are backing as the long-term successor to MapReduce.

Like Tez, Apache Spark supports DAGs that describe the entire dataflow process, not just individual map and reduce jobs. Like Pig, it has a concept of datasets (Pig’s aliases and relations), but crucially these datasets (RDDs, or “resilient distributed datasets”) can be cached in-memory, fail-back gracefully to disk and can be rebuilt using a graph that says how to reconstruct. With Tez, individual jobs in the DAG can now hand-off their output to the next job in-memory rather than having to stage in HDFS, but Spark uses memory for the actual datasets and is a much better choice for the types of iterative, machine-learning tasks that you tend to do on Hadoop systems. Moreover, Spark has arguably a richer API and when used with Scala, a functional programming-orientated language that uses Java libraries and whose collections framework maps well on to the types of operations you’d want to make use of with dataflow-type applications on a cluster.

Spark can run standalone, on YARN or on other cluster management platforms, and comes with a handy command-line interpreter that you can use to interactively load, filter, analyse and work with RDDs. Cloudera CDH5.2 comes with Spark 1.0.1 and can either be configured standalone or to run on YARN, with Spark as a service added to nodes in the cluster using Cloudera Manager. 

NewImage

So looking back at the Pig example, we create the dataflow using a number of aliases in that case, that we progressively filter, transform, join together and then aggregate to get to the final top ten set of pages from the website logs. Translating that dataflow to Spark we end up with a similar set of RDDs that take our initial set of logs, apply transformations and join the datasets to store the final aggregated output back on HDFS.

NewImage

Spark supports in-memory sharing of data within a single DAG (i.e. RDD to RDD), but also between DAGs running in the same Spark instance. As such, Spark becomes a great framework for doing iterative and cyclic data analysis, and can make much better use of the memory on cluster servers whilst still using disk for overflow data and persistence.

Moreover, Spark powers a number of higher-level tools build on the core Spark engine to provide features like real-time loading and analysis (Spark Streaming), SQL access and integration with Hive (Spark SQL), machine learning (MLib) and so forth. In fact, as well as Hive and Pig being reworked to run on Tez there’s also projects underway to port them both to Spark, though to be honest they’re both at early stages compared to Tez integration and most probably you’ll be using Scala, Java or Python to work with Spark now.

NewImage

So taking the Pig script we had earlier and translating that to the same dataflow in Spark and Scala, we end up with something like this:

I’ll do a code-walkthrough for this Spark application in a future post, but for now note the map and flatMap Scala collection functions used to transform RDDs, and the sql(“…”) function that allows us to register RDDs as tables and then manipulate the contents using SQL, including joining to other RDDs registered as tables. For now though, let’s run the application on the CDH5.2 using YARN and see how long it takes to process the same set of log files (remember, the Pig script on this CDH5.2 cluster took around 5 minutes to run, and the Pig on Tez version on the Hortonworks cluster was around 2.5 minutes:

It ran in just over a minute in the end, and most of that was around submitting the job to YARN – not bad. We’ll be covering more of Spark on the blog over the next few weeks including streaming and machine learning examples, and connecting it to ODI and OBIEE via Hive on Spark, and Spark SQL’s own Hive-compatible Thrift server. I’ll also be taking a look at Pig on Spark (or “Spork”…) to see how well that works, and most interestingly how Pig and Hive on Spark compares to running them on Tez – watch this space as they say.

Six Months an ACE…”Boy, that escalated quickly”

December 8th, 2014 by

Boy, that escalated quickly. I mean, my path to becoming an Oracle ACE since joining Rittman Mead!

When I began with Rittman Mead back in March 2012, I wasn’t planning on joining the ACE program. In fact, I really wasn’t sure what an Oracle ACE even was. If you’re not familiar, the Oracle ACE program “highlights excellence within the global Oracle community by recognizing individuals who have demonstrated both technical proficiency and strong credentials as community enthusiasts and advocates”. That’s quite a mouthful! Now let’s find out how I got to this point in my career.

I wrote my first blog post ever shortly after joining Rittman Mead, sharing an innovative way to integrate Oracle Data Integrator and GoldenGate for data warehousing. This lead to submitting my first abstracts to several Oracle User Group conferences. My goal was to challenge myself to grow as a public speaker, something I had never been very good at in the past, and to share my data integration know-how, of course. To my amazement, the first abstract submitted, for UKOUG 2012, was accepted. Then, the other conferences continued to accept my abstracts! During the first couple of years, I kept blogging, speaking, wrote an article for RMOUG SQL>UPDATE Magazine, and joined in on a couple of ODTUG ODI expert webcasts and OTN ArchBeat podcasts over the past couple of years. Before I knew it, I had taken advantage of many knowledge sharing opportunities and built a decent list for my Oracle ACE nomination. I’ve found that networking and sharing your experiences are both wonderful ways to get noticed by other Oracle experts.

BI Forum 2014

So now what is an Oracle ACE to me? I think it is someone knowledgeable about an Oracle product (or products) who enjoys sharing their knowledge and loves contributing to the greater Oracle community. It’s that simple. One thing about the Rittman Mead organization, sharing what we know is a part of the ethos of the company. If you’re an avid reader this blog, you’re well aware that Mark Rittman and others love to share their technical knowledge. We also have an internal system that allows us to share our know-how amongst each other. That’s one of the great benefits of working at Rittman Mead. If I don’t know the answer to a question, I can easily reach out to the entire company and get a response from experts like Mark, James Coyle, Andy Rocha or others within minutes. I love that about this organization!

Well, around March or April timeframe, it was mentioned in a company meeting that if anyone has the goal of joining the ACE program, reach out to one of our current ACEs or ACE Directors and work on a plan to get there (again, a testament to how the RM organization works and thinks). I talked with Stewart Bryson, Oracle ACE Director. After reviewing my contribution to the Oracle community he agreed to submit the nomination for me. Again, to my amazement, and very much my delight, my nomination was accepted as an Oracle ACE! What began as several small goals of improving my writing and speaking skills, sharing technical content, and having an article published, led to the overall end goal (though not known to me at the time) of an Oracle ACE program invitation. I worked hard to earn the nomination, but I also know I was very fortunate to have been surrounded by, and mentored by, some great individuals (and I still am!).

oow14-badge-small

Since joining the ACE program, I’ve gained some additional Twitter followers, a great new ribbon on my conference badges, and some sweet ACE gear – but really it feels like not too much else has changed. Well, I do have an additional group of experts available to help me when I’m in a bind, because that’s what ACEs tend to do. I can now send a question via a tweet or email to someone in the ACE program and I’ll typically get an expert answer. It’s a community of like-minded individuals that love their specific Oracle technology – and love to share knowledge and help others. If this sounds like you, it’s time to work towards that ACE nomination!

Here at Rittman Mead we have several ACEs: myself, Venkat JanakiramanEdelweiss Kammermann, and our newest ACE, Robin Moffatt, as well as an ACE Director, Mark Rittman. And our ACEs don’t just sit back and blog or speak at conferences (not that blogging and speaking isn’t hard work – it is!). We also provide all of the Rittman Mead services offered as consultants: training, consulting services, managed services support – all of it! Which means that when you hire Rittman Mead, the 5 folks in the ACE program are included, along with the other 100+ BI experts throughout the world. If you need some help on your Business Intelligence, Data Integration, or Advanced Analytics project, drop us a line at info@rittmanmead.com and we’ll be happy to have a chat.

Now that I’ve met the goal of Oracle ACE, what’s ahead for me? Well, I’m constantly working to keep up on the latest and greatest from the Oracle Data Integration team, while also consulting full-time. I plan to continue speaking and writing blog posts, but also work towards some additional published content in newsletters and magazines for the Oracle community. And I’m going to keep learning, because education is a never-ending journey. “They’ve done studies, you know. 60 percent of the time, it works every time.” — Brian Fantana 

By the way, if you think Rittman Mead is the type of company you’d like to work for, give us a shout at careers@rittmanmead.com.

Going Beyond MapReduce for Hadoop ETL Pt.2 : Introducing Apache YARN and Apache Tez

December 8th, 2014 by

In the first post in this three part series on going beyond MapReduce for Hadoop ETL, I looked at how a typical Apache Pig script gets compiled into a series of MapReduce jobs, and those MapReduce jobs pass data between themselves by writing intermediate resultsets to disk (HDFS, the Hadoop cluster file system). As a reminder, here’s the Pig script we’re working with:

When you submit the script for execution using the Pig client, it then parses the Pig Latin script, logically optimizes it and then compiles it into MapReduce programs; these programs are then sent in-turn to the Hadoop 1.0 Job Manager which then sends them for execution on the Hadoop cluster – in the case of our Pig script, there’s five MapReduce programs generated in-total.

NewImage

Each one of these MapReduce programs are what’s called “directed acyclic graphs”, or DAGs. A directed acyclic graph is a programming style within distributed computing where processing is broken down into functions that can be run independently of one another, as long as one is not an ancestor of another. In MapReduce terms, this means that all mappers can run independently of each other (and therefore on different nodes in a cluster) and it’s only the reducers that have to wait for their ancestors to finish before they can also start their work independently of the other reducers. It’s a great programming model for processing large amounts of data with fault tolerance across a cluster and it was the key insight that made MapReduce possible and the “big data” systems that we work with today.

NewImage

A Pig script that generates five MapReduce jobs then effectively creates five DAGs, with each one using files (HDFS) to persist and hand-off data between them and JVMs being spun-up for the individual map and reduce jobs. As such, there’s no way for this version of MapReduce and the Hadoop framework it runs on to consider the overall dataflow, and each DAG has to run in isolation.

NewImage

To address this issue, version 2.0 of Hadoop introduced a new feature called Apache YARN, or “yet another resource negotiator”. YARN took on the resource management and job scheduling/monitoring parts of Hadoop and made it so that YARN then effectively became an “operating system” for Hadoop that then allowed frameworks to run on it; initially MapReduce2 (reworked to run on YARN), but since then other ones like Apache Tez, and Apache Spark.

NewImage

YARN also crucially supported frameworks that used DAGs that describe the entire dataflow, not just individual MapReduce jobs, and a new framework that came out of this that used that new capability was Apache Tez. Tez is a generalisation of the MapReduce distributed compute framework that supports these dataflow-style DAGs and runs either MapReduce code unaltered, or has its own API for describing DAGs using vertexes  (logic and resources) or edges (connections). Both Hive and Pig have been ported to run on Tez, and in Pig’s case this means another type of compiler is added to the existing MapReduce one, and Pig scripts executed on Tez can now submit a DAG that encompasses all stages in the dataflow and the reducers and be linked-together via in-memory passing of intermediate steps, rather than having to write those intermediate steps to disk.

NewImage

In practice, what this means is that if your version of Pig or Hive has been updated to also run on Tez, you can run your code unaltered on Tez and typically see a 2-3x performance improvement without any changes to your code or application logic. Hortonworks have been the main Hadoop vendor backing Tez and their new Hortonworks Data Platform 2.2 comes with Tez support, so I took my Pig script and ran it on a five-node cluster to get an initial timing and it took 4 min 17 secs to run, again generating the same five MapReduce jobs that I saw on the Cloudera CDH5 cluster. Then, running it using Tez as the execution engine (pig -x tez analyzeblog.pig) the same script took 2mins 25secs to run, around twice as fast as when we ran it using regular MapReduce.

NewImage

 

And Tez is great for getting existing Pig and Hive scripts to run faster – if your platform supports it, you should use it instead of MapReduce as your execution engine; MapReduce code submitted “as is” will benefit from better YARN container re-use, and Hive and Pig scripts that run on the Tez execution engine can run as a single DAG and use memory, rather than disk, to pass data between jobs in the DAG. For ETL and analytic jobs that you’re creating from new though, Apache Spark is arguably the framework you should look to use instead of Tez, and tomorrow we’ll find out why.

Going Beyond MapReduce for Hadoop ETL Pt.1 : Why MapReduce Is Only for Batch Processing

December 7th, 2014 by

Over the previous few months I’ve been looking at the various ways you can load data into Hadoop, process it and then report on it using Oracle tools. We’ve looked at Apache Hive and how it provides a SQL layer over Hadoop, making it possible for tools like ODI and OBIEE to use their usual SQL set-based process approach to access Hadoop data; later on, we looked at another Hadoop tool, Apache Pig, which provides a more dataflow-type language over Hadoop for when you want to create step-by-step data pipelines for processing data. Under the covers, both Hive and Pig generate Java MapReduce code to actually move data around, with MapReduce then working hand-in-hand with the Hadoop framework to run your jobs in parallel across the cluster.

But MapReduce can be slow; it’s designed for very large datasets and batch processing, with overall analysis tasks broken-down into individual map and reduce tasks that start by reading data off disk, do their thing and then write the intermediate results back to disk again.

NewImage

Whilst this approach means the system is extremely fault-tolerant and effectively infinitely-scalable, this writing to disk of each step in the process means that MapReduce jobs typically take a long time to run and don’t really take advantage of the RAM that’s available in today’s commodity servers. Whilst this is a limitation most early adopters of Hadoop were happy to live with (in exchange for being able to analyse cheaply data on a scale previously unheard of), over the past few years as Hadoop adoption has broadened there’s been a number of initiatives to move Hadoop past it’s batch processing roots and into something more real-time that does more of its processing in-memory. Whilst there are whole bunch of projects and products out there that claim to improve the speed of Hadoop processing and bring in-memory capabilities – Apache Drill, Cloudera Impala, Oracle’s Big Data SQL are just some examples – the two that are probably of most interest to Hadoop customers working in an Oracle environment are called Apache Spark, and Apache Tez. But before we get into the details of Spark, Tez and how they improve over MapReduce, let’s take a look at why MapReduce can be slow.

MapReduce and Hadoop 1.0 – Scalable, Fault-Tolerant, but Aimed at Batch Processing

Going back to MapReduce and what’s now termed “Hadoop 1.0”, MapReduce works on the principle of breaking larger jobs down into lots of smaller ones, with each one running independently and persisting its results back to disk at the end to ensure data doesn’t get lost if a server node breaks down. To take an example, the Apache Pig script below reads in some webserver log files, parses and filters them, aggregates the data and then joins it to another Hadoop dataset before outputting the results to a directory in the HDFS storage layer:

Pig works by defining what are called “relations” or “aliases”, similar to tables in SQL that contain data or pointers to data. You start by loading data into a relation from a file or other source, and then progressively define further relations take that initial dataset and apply filters, use transformations, re-orientate the data or join it to other relations until you’ve arrived at the final set of data you’re looking for. In this example we start with raw log data, parse it, filter out bit and spider activity, project just the columns we’re interested in and then remove further “noise” from the logs, then join it to reference data and finally return the top ten pages over that period based on total hits.

NewImage

Pig uses something called “lazy evaluation”, where relations you define don’t necessarily get created when they’re defined in the script; instead they’re used as a pointer to data and instructions on how to produce it if needed, with the Pig interpreter only materializing a dataset when it absolutely has to (for example, when you ask it to store a dataset on disk or output to console). Moreover, all the steps leading up to the final dataset you’ve requested are considered as a whole, giving Pig the ability to merge steps, miss out steps completely if they’re not actually needed to produce the final output, and otherwise optimize the flow data through the process.

Running the Pig script and then looking at the console from the script running the Grunt command-line interpreter, you can see that five separate MapReduce jobs were generated to load in the data, filter join and transform it, and then produce the output we requested at the end.

Pig generated five separate MapReduce jobs that loaded, parsed, filtered, aggregated and joined the datasets as part of an overall data “pipeline”, with the intermediate results staged to disk before the next MapReduce job took over. On my six-node CDH5.2 VM cluster it took just over five minutes to load, process and aggregate 5m records from our site’s webserver.

NewImage

Now the advantage of this approach is that its more or less infinitely scalable and certainly resilient, but whilst Pig can look at your overall dataflow “graph” and come up with an optimal efficient way to get to your end result, MapReduce treats every step as atomic and separate and insists on writing every intermediate step to disk before moving on.

What this means in-practice is that ETL routines that use Pig, Hive and MapReduce whilst scaling well, never really get to the point where you can run them as micro-batches or in real-time. For that type of scenario we need to look at moving away from MapReduce and breaking the link between Hadoop (the platform, the cluster management and resource handling part) and the processing that runs on it, so that we can run alternative execution engines on the Hadoop platform such as Apache Tez, which we’ll cover in tomorrow’s post.

All You Need, and Ever Wanted to Know About the Dynamic Rolling Year

December 4th, 2014 by

DRY_1

Among the many reporting requirements that have comprised my last few engagements has been what, from now on, I’ll be referring to as the ‘Dynamic Rolling Year’. That is, through the utilization of a dashboard prompt, the user is able to pick a year – month combination and the chosen view, in this case a line graph, will dynamically display a years’ worth of figures with the chosen year – month as the most current starting point. The picture above better demonstrates this feature. You can see that we’ve chosen the ‘201212’ year – month combination and as a result, our view displays revenue data from 201112 through to 201212. Let’s choose another combination and make sure things work properly and to demonstrate functionality.

DRY_2

From the results on the picture above, it seems we’ve gotten our view to do as expected. We’ve chosen ‘201110’ as our starting year – month combination and the view has changed to display revenue trend back to ‘201010’. Now how do we do this? A quick web search will offer up myriad complex and time-consuming solutions which, while completely feasible, can be onerous to implement. While it was tempting to reap the benefits of someone else’s perhaps hard fought solution, I just knew there had to be an easier way. As a note of caution, I’ll preface the next bit by saying that this only works on a dashboard through using a dashboard prompt in tandem with the target analysis. This solution was tested via a report prompt, however the desired results were not obtained, even when ‘hard coding’ default values for our presentation variables. As well, there are a few assumptions I’m making around what kinds of columns you have in your subject area’s time dimension, namely that you have some sort of Year and Month column within your hierarchy.

1. Establish Your Analysis and Prompt

DRY_3

I’ve brought in the Month (renamed it Year – Month) column from Sample App’s Sample Sales subject area along with the Revenue fact. For the sake of this exercise, we’re going to keep things fairly simple and straightforward. Our Month column exists in the ‘YYYY / MM’ format so we’re going have to dice it up a little bit to get it to where we need it to be. To get our dynamic rolling year to work, all we’re going to do is a little math. In a nutshell, we’re going to turn our month into an integer as ‘YYYYMM’ and simply subtract a hundred as a filter using the presentation variables that correspond to our new month column. To make the magic happen, bring in a dummy column from your time dimension, concatenate the year and month together, cast them as a char and then the whole statement as an integer (as seen in the following formula: CAST(substring(“Time”.”T02 Per Name Month”, 1, 4)||substring(“Time”.”T02 Per Name Month”, -2, 2) AS INT) ). Then, apply a ‘between’ filter to this column using two presentation variables.

DRY_4

Don’t worry that we’ve yet to establish them on our dashboard prompt as that will happen later. In the picture above, I’ve set the filter to be between two presentation variables entitled ‘YEARMONTH_VAR’. As you can see, I’ve subtracted a hundred as, in this case, my year – month concatenation exists in the ‘YYYYMM’ format. You may alter this number as needed. For example, a client already had this column established in their Time dimension, yet it was in the ‘YYYYMMM’ format with a leading zero before the month. In that case, we had to subtract 1000 in order to establish the rolling year. Moving on…. After you’ve set up the analysis, save it off and let’s build the dashboard prompt. The results tab will error out. This is normal. We’re going to be essentially doing the same thing here.

We need to get our analysis and prompt to work together (like they should) and so we’ll need to, once again, format the year – month column on the prompt the same way we did it on our analysis through the ‘prompt for column’ link. You can simply copy and paste the formula you used in the ‘Edit Formula’ column on your analysis into this screen. Don’t forget to assign a presentation variable to this column with whatever name you used on the filter in your analysis (in this case it was YEARMONTH_VAR).

DRY_6

DRY_7

DRY_8

2. Build the Dashboard

2014-12-04_10-04-56

Starting a new dashboard, bring in your prompt and analysis from the Shared Folder. In this exercise, I’ve placed the prompt below the analysis and have done some custom formatting. If you’d like to create this example exactly, then in addition to the default table view, you’ll need to create a line graph view that groups by our year – month column and uses the revenue column as the measure. And voila. Save and run the dashboard. Note that if you didn’t adopt a default value for your dashboard prompt, it will error out before you actually pick and apply a value from the prompt drop-down. Admittedly, the closer to a concatenated year-month (YYYYMM) column you have in your time dimension, the easier the implementation of this method will be. You could even set your default value to an appropriate server/repository variable so that your dashboard opens to the most current and / or a desired year-month combination. However, now that you’ve seen this, hopefully it’s sparked some inspiration into how you might apply this technique to other facets of your work and future reporting.

DRY_1

Website Design & Build: tymedia.co.uk