I was motivated to write this post after reading  The Mathematics behind Hadoop Based System.  When you want to improve the performance of your Hadoop application, there are many options to tweak the system. The questions is where to focus to get the biggest bang for your buck. This post will address the issue with a simple model and an example.

## Performance Model

When processing data collected over time, the time taken for a Hadoop job can be described as

t = ov + (ps + pa) * h

• ov = the fixed overhead in running a job
• ps = processing time for Hadoop system for an hour worth of data
• pa = processing time in map and reduce for an hour worth of data
• h = number of hours over which data is collected
• t = processing time

Unlike the post I mentioned, I have divided the dynamic part of the processing time into two parts, one part related to time spent in Hadoop system and the other in the application logic.

The post also talks about stable processing time which corresponds to h being equal to t.  In this condition, the Hadoop job will be perpetually busy i.e., by the time a job has finished it’s ready for the next run. I am leaving the equation as is and not considering stable processing time.

There is a fixed overhead cost  in running any Hadoop job, which is represented by ov. The overhead is mostly associated with  the work involved in job submission handled by the jobtracker. For the most part, it’s independent of the data size.

The dynamic part of the time has two components.  The first component represented by ps, contains all the time taken by Hadoop system for processing a job. This part is mostly disk and network IO bound. Bradly speaking it constitutes the following

• Input and output for map
• Shuffle
• Input and output for reduce

The second component represented by pa is the time taken by the application logic i.e. time spent in the map and reduce functions. This time is all CPU bound, unless map and reduce functions access some remote systems, which is not desirable. It contains the following

• Execution of map
• Execution of reduce

## Performance Tweaking

For a given cluster, the overhead is more or less fixed and not much can be done about it. Our focus is on the dynamic and data size dependent part of the time. We would like to investigate overall performance improvement when  performance is improved for ps and pa.

Cloudera has a good post on performance tips. This Yahoo presentation in slideware also has some important performance tips. These performance tips have mostly to do with overhead and system performance time.

## System Processing Time

Let’s say we have improved ps by tweaking different Hadoop config paramenets by a factor of n. Then the the overall performance improvement factor is the ratio of the two time values as below

t2/t1 = (ov + (n * ps + pa) * h)  / (ov + (ps + pa) * h)

After some juggling this yields

t2/t1 = n + (1 – n) * ((ov + pa * h) / (ov + ps * h + pa * h))

We would like to see the second part as small as possible, which will happen if ps dominates over ov and pa. When ps is large, the over all performance improvement will be close to performance improvement in ps.

## Application Processing Time

If we improve pa by a factor of n , by optimizing map and reduce code, the overall performance is as follows

t2/t1 = (ov + ( ps + n * pa) * h)  / (ov + (ps + pa) * h)

which simplifies to

t2/t1 = n + (1 – n) * ((ov + ps * h) / (ov + ps * h + pa * h))

We will get the best overall performance improvement when pa dominates over ov and ps, which is typically not the case for most Hadoop jobs, unless you have very complex CPU intensive logic in your map and reduce functions.

## Pseudo Real Time Processing

Although Hadoop is a batch processing system, we can get some insight on how well it may work a real time processing. By equating t and h, we get the minimum possible latency for near real time processing by Hadoop. It’s as follows

t = ov  /  (1 – (pa + ps))

To take an example, let’s say ov = 1 min, ps = 6 min and pa = 4 min. To process an hour’s worth of data it takes 11 min in this example. By plugging in these values into the equation above we get t = 72 sec.

To get the minimum possible latency for this example , we have to run Hadoop every 72 sec and it will process data collected over the last 72 sec. So the latency will be  between 72 sec and 144 sec.

In real life it’s not advisable to run Hadoop for small datasets, unless the data collection rate is so high that there is significant amount of data collected in a 72 sec window.

## Example

We will use this example to show some numbers for overall performance improvement.

n = 0.8 and ov = 0.05 hours and ps = 0.2 hours

The overhead time is 0.05 hours, the system processing time (ps)  is 0.2 hours.  We will consider 20% improvement in ps and pa. We will consider two scenarios. In one case, application peocessing time (pa)is 40% of ps and in the other scenario pa is 20% of ps. We are considering a typical IO bound Hadoop job.

We will apply the improvement factor (n) to ps and then to pa and see the overall improvements for the different scenarios. When n = 0.8 i.e., 20 % improvements in ps or pa, the results for overall improvement is as follows from the equations above.

 pa=0.4ps pa=0.2ps improve ps 12.2 12.7 improve pa 5.0 3.0

For our specific example, as expected, improving ps has much bigger impact on the overall performance improvement.

## Measuring Processing Time

To apply the equations to investigate performance we need ov, ps and ps for a job. The processing time is linear function of number of  hours. The intercept is the overhead (ov) and the slope is the total processing time per hour of data (ps + pa).

The intercept and slope can be found easily by having two data points. We could run the Hadoop job for 1 hour worth of data and 4 hours worth of data and measure total elapsed time for both jobs ( t1 and t4). The results are as follows

ov = (4 * t1 – t4) / 3 and (ps +pa) = (t4 – t1)/3

We have the total hourly processing time. How do we get the individual values. One way is to use Hadoop counter to get the elapsed time for all map and reduce executions, which will essentially give us pa. The remaining part is ps.

Since Hadoop cluster conditions may vary depending on the number of jobs running, among other things, it is important to make multiple runs at different times and take the average for t1 and t4.

## Conclusion

By taking measurements we can found out if a Hadoop job is IO bound (more time spent in system processing) or CPU bound (more time spent in application processing). Depending on the case, we can try to improve ps or pa and investigate overall performance improvement.

I am Pranab Ghosh, a software professional in the San Francisco Bay area. I manipulate bits and bytes for the good of living beings and the planet. I have worked with myriad of technologies and platforms in various business domains for early stage startups, large corporations and anything in between. I am an active blogger and open source project owner. I am passionate about technology and green and sustainable living. My technical interest areas are Big Data, Distributed Processing, NOSQL databases, Machine Learning and Programming languages. I am fascinated by problems that don't have neat closed form solution.
This entry was posted in Map Reduce, Performance and tagged , . Bookmark the permalink.

### 6 Responses to Simple Hadoop Performance Model

1. Vitthal Gogate says:

Hadoop Vaidya is a good practical tool to find performance issues with MR job based on job history analysis. I plan to contribute more rules soon

2. Kiran My says:

Thanks for detailed article on Hadoop. hadoop can also be used olap and oltp processing.