Hadoop Orchestration


Most data processing tasks with Hadoop require multiple Hadoop jobs with dependencies between them. The dependency arises out of the need for one job to use the output for another job. The dependency between Hadoop jobs can be expressed as a directed acyclic graph (DAG), where each node represents a Hadoop job. DAG is useful for modelling relationship between entities that have partial ordering.

The dependencies should be decoupled from  the basic Hadoop job implementation, so that the same jobs can be combined in different ways as work flow, with the flow or job dependency preferably defined declaratively.

In this post, I will discuss a simple Hadoop work flow system, I implemented.

Current Landscape

There is a wide spectrum of solutions in use.  At one end we have shell scripts that many people resort to.  Hadoop also provides a JobControl class for orchestrating and executing multiple Hadoop jobs.

At the other end of the spectrum there are  complex server based Hadoop work flow systems.  They provide an web based interface. The most popular is Oozie from Yahoo. Azkaban from Linkedin is another Hadoop work flow system.  If you want all the features these systems offer and you are ready to manage a server based installation, then they should be seriously considered.

Yet another options is to use SQL like data processing language like Pig or Hive. Runtime for these tools translate the the data processing functions to map reduce jobs that get executed under the hood. One limitation of these tools is that data processing is restricted to the functions supported, although they support extension mechanisms through user defined functions.

I was interested in a small foot print client side java library as my work flow system. Since I didn’t find any, I ended up implementing one. The purpose of this post is to provide the details of my solution, called fluxua. The  source code for fluxua is hosted on github and it’s fully functional. All you have to do to use it is to include the jar in your Hadoop application.

Simple Client Side Library

When an Hadoop work flow is executed with fluxua, it becomes the entry point. The Hadoop jobs and the flows defined in terms of the  the jobs are defined in JSON file.  I will take a  sample JSON configuration file and dissect it line by line to show how fluxua works.

The work flow engine is multi threaded.

The main driver runs in a thread executing a loop. Every time through the loop it finds one or more  jobs that are ready to run. A job is ready to run when all the jobs it depends on have successfully executed.

  Each job is launched from a dedicated job launcher thread. Then the main driver thread it waits on the blocking queue waiting for messages from job launching threads.

The job launching thread invokes the job class through Hadoop ToolRunner class and then blocks until the Hadoop job has completed. When the job completes, the launcher thread inserts a success message or a failure message into the blocking queue, depending upon whether the succeeded or failed.

The job class should start the Hadoop job in the  blocking mode, otherwise the work flow will not be able to track a job.

One exception might be for jobs that are at the DAG leaf i.e., jobs that no other job depend upon. These jobs could be executed in a non blocking mode, in which case the control will immediately return the launcher thread.

A job is ready to run, when all the jobs that it depends on have already executed successfully. If multiple jobs are found that are ready to run, they are launched in parallel from multiple threads. The lob launching thread communicates with the main driver thread through a blocking queue.

The work flow engine is non intrusive. It does not require a job class to extend any specific class or implement any specific interface. It should just implement the Hadoop Tool interface, as it’s normally done.

Existing map reduce implementation could be easily be incorporated into the workflow, without code change.

Since The Hadoop Tool interface does not have any specific reference to map reduce, the job class does not necessarily have to implement map reduce and it could implement any logic, e.g., copying files or  execution Pig or Hive scripts.

Configuration Dissection

The configuration consists of definitions of a set of Hadoop jobs, followed by definitions of one or more flows. The job definition consists of all the configuration a job needs.

A flow definition consists of defining the job flow DAG, which models the dependencies between the jobs.  Each node in the DAG represents a Hadoop job.

In the flow definition, for each job it lists the  jobs it depends on.

Here is an example from my one of my projects.  This project is about prediction of certain visitor attributes, based on click stream data using Bayesian filtering technique. This a simple flow, with only three jobs, one job depending on the other two. The two parent jobs could run in parallel. When they both finish, the third job starts.

{
	"interactive" : false,
	"verbose" : true,
	"jobConfigs" :
	[
		{
			"name" : "dor",
			"description" : "domain range of atributes",
			"author" : "pranab",
			"className" : "mawazo.zaal.Main",
			"inputPaths" : ["zaal/input"],
			"outputPath" : "zaal/dor/output",
			"files" : ["/home/pranab/Projects/zaal/meta/metadata.json"],
			"libjars" :
			["/home/pranab/Projects/lib/jackson/jackson-core-lgpl-1.6.3.jar",
			"/home/pranab/Projects/lib/jackson/jackson-mapper-lgpl-1.6.3.jar" ],
			"userParams" :
			["zaal.dynAttr.file.path=/zaal/meta/domrange.txt",
			"zaal.metaData.file.path=/zaal/meta/metadata.json",
			"mapred.map.tasks.speculative.execution=false",
			"mapred.reduce.tasks.speculative.execution=false"],
			"useDependentOutput" : false
		},
		{
			"name" : "pam",
			"description" : "page category mapper",
			"author" : "pranab",
			"className" : "mawazo.zaal.Main",
			"inputPaths" : ["zaal/input"],
			"outputPath" : "zaal/pam/output",
			"files" : ["/home/pranab/Projects/zaal/meta/metadata.json"],
			"libjars" :
			["/home/pranab/Projects/lib/jackson/jackson-core-lgpl-1.6.3.jar",
			"/home/pranab/Projects/lib/jackson/jackson-mapper-lgpl-1.6.3.jar" ],
			"userParams" :
			["zaal.metaData.file.path=/zaal/meta/metadata.json",
			"mapred.map.tasks.speculative.execution=false",
			"mapred.reduce.tasks.speculative.execution=false"],
			"useDependentOutput" : false
		},
		{
			"name" : "bcl",
			"description" : "bayesian classifier",
			"author" : "pranab",
			"className" : "mawazo.zaal.Main",
			"inputPaths" : [],
			"outputPath" : "/zaal/bcl/output",
			"libjars" :
			["/home/pranab/Projects/lib/jackson/jackson-core-lgpl-1.6.3.jar",
			"/home/pranab/Projects/lib/jackson/jackson-mapper-lgpl-1.6.3.jar" ],
			"userParams" :
			["zaal.dynAttr.file.path=/zaal/meta/domrange.txt",
			"zaal.metaData.file.path=/zaal/meta/metadata.json",
			"mapred.map.tasks.speculative.execution=false",
			"mapred.reduce.tasks.speculative.execution=false"],
			"useDependentOutput" : true

		},
		{
			"name" : "hva",
			"description" : "hold out validation",
			"author" : "pranab",
			"className" : "mawazo.zaal.Main",
			"outputPath" : "zaal/hva/output"
		}
	],
	"flowConfigs" :
	[
		{
			"name" : "bayesian",
			"description" : "bayesian prediction for visitor profile",
			"author" : "pranab",
			"flowNodes" :
			[
				{
					"job" : "dor"
				},
				{
					"job" : "pam"
				},
				{
					"job" : "bcl",
					"preReqJobs" : ["dor", "pam"]
				}
			]

		}
	]

Most of the parameters are self explanatory.  Global configuration parameters are as follows

interactive If set true, will prompt before executing next job. The user has the option of qutting before the next job started.
verbose If set true will display detailed messages in console

Job configuration parameters are as follows. They reflect the command line parameters expected by the Hadoop jar command.

name Job name. Referred by flow definition. Gets passed to the hadoop job class which can use it to set job name
description Job description, mostly for documentation purpose
author Job implementor, mostly for documentation purpose
className Java class name for Hadoop job
inputPaths List of HDFS input paths
libJars List of additional jars to be distributed in cluster
files List of files to be distributed to distributed cache
userParams User config params
useDependentOutput If set to false, output of parent job won’t be used as input

Flow configuration parameters are as follows. Multiple flows could be defined. While invoking the work flow, the user could specify the flow name to execute as a command line argument.

name Flow name
description Flow description
author Flow implementor, mostly for documentation purpose
iterClassName A java class name, for flow iteration
flowNodes List of flow nodes. Consists of a list job names. For each job, all the jobs it depends on are specified

Flow Iteration

Sometimes, a whole flow needs to be executed iteratively, until some criteria is met. The flow configuration parameter iterClassName specifies a Java class name that implements Java Iterator interface and is provided by the user. This feature is particularly useful for data mining or optimization problems that require recursion or iteration.

As long as the iterator class hasNext() method return true, the whole flow gets re executed. A default flow iterator class is provided that ensures that the flow gets executed only once. It’s hasNext() method returns true only for the first call.

Running the Workflow

The flow described in the configuration can be executed with the following shell script

DR_JAR=/home/pranab/Projects/fluxua/target/fluxua-1.0.jar
JAR=/home/pranab/Projects/zaal/target/zaal-1.0.jar
CL=org.fluxua.driver.JobDriver
CONFIG=/home/pranab/Projects/zaal/zaal.json
export HADOOP_CLASSPATH=$DR_JAR:$JAR
echo $HADOOP_CLASSPATH
hadoop fs -rmr /zaal/dor/output
hadoop fs -rmr /zaal/bcl/output
hadoop jar $JAR $CL $CONFIG bayesian sample

The first argument after the class name is the JSON config file path. The next argument is the name of the flow to execute. The next argument is the flow instance name for this particular invocation. The flow instance name is combined with the job names, to job names for the current invocation of the flow.

If a set of jobs needs to skipped for this invocation of the flow, they can be provided as the last argument as a comma separated list of job names.

Failure Handling

If a job fails, no additional jobs are executed. The main driver thread waits until all currently executing jobs have finished and then it quits.

One might argue that even with failed job, those jobs that are not dependent on the failed job should still execute.  In other words, if the failed job node is not on the dependency path of a job node in the DAG, then the job should run. That’s not the way it behaves currently, but the change can be made easily.

It is possible to execute a partially completed flow, by specifying  the jobs to skip when launching the workflow. In case of a failed job, you might fix the problem and then restart the flow, but this time only execute the jobs that have not executed yet. This is a useful feature during testing and debugging.

Processing Time Based Data

One of the common Hadoop data processing tasks is to process accumulated time based data at regular interval e.g., processing accumulated log data once every hour.

If  log file names are date, time encoded, then fluxua allows you to provide Hadoop job input path as a a template and then have a user defined class to convert the template to an actual path. Some additional configuration for the Hadoop job will be needed. Here is an example.

	"inputPaths" : ["/.../.../myapp_{date}.log"],
	"inputProcessorClass" : "org.fluxua.driver.DateEncodedPathProcessor",
	"inputProcessorParams" : ["timeUnit", "hour", "timeInterval", "2"],

DateEncodedPathProcessor class processes the input path  and replaces the place holder  with the date time of the past hour.  In our example, the path after conversion might be myapp_2011_05_15_10.log. If the config parameter inputProcessorClass is provided, the driver will call the processInputPath() method of the class and will use the path returned that method. The value of inputProcessorParams is use to initialize the path processor object. The parameters are specific to path processor class.

For this example, the work flow might  be executed as  a cron job to execute once every 2 hours. If the log files are created for every two hours, the cron job might be launched 15 minutes after every 2 hours to process the last two hours worth of data.

DateEncodedPathProcessor class extends the base PathProcessor class and it’s included in fluxua . For any other transformation of a path defined as a template, the user could provide an appropriate class extended from PathProcessor.

Wrapping Up

Fluxua is a simple, small foot print  client side java library. Although it does not have all the bells and whistles of Oozie or Azbakan, it meets the basic needs of a Hadoop work flow system.

About these ads

About Pranab

I am Pranab Ghosh, a software professional in the San Francisco Bay area. I manipulate bits and bytes for the good of living beings and the planet. I have worked with myriad of technologies and platforms in various business domains for early stage startups, large corporations and anything in between. I am an active blogger and open source contributor. I am passionate about technology and green and sustainable living. My technical interest areas are Big Data, Distributed Processing, NOSQL databases, Data Mining and Programming languages. I am fascinated by problems that don't have neat closed form solution.
This entry was posted in Hadoop and Map Reduce, Java, Workflow and tagged , , . Bookmark the permalink.

5 Responses to Hadoop Orchestration

  1. Pingback: How to chain multiple MapReduce jobs in Hadoop? « Cloud Celebrity (All about Cloud)

  2. Pingback: Hadoop Orchestration | BigDataCloud.com

  3. pariksheet says:

    Hi Pranab, I have very basic doubt. How to read user parameters in my MapReduce code, specified in configuration json.

    e.g. {
    “userParams” :
    [“hbase.master=aws_dev_cluster_ip”]
    }

    how to read this value in my MR code.

    Many Thanks
    Pari

    • Pranab says:

      Pari
      You could copy the JSON file to HDFS and read it in the setup method of your mapper or reducer. I do that a lot for metadata in many of my projects

  4. pariksheet says:

    Thanks Pranab. Suggestion worked for me.

    Cheers
    Pari

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s