I was motivated to write this post after reading The Mathematics behind Hadoop Based System. When you want to improve the performance of your Hadoop application, there are many options to tweak the system. The questions is where to focus to get the biggest bang for your buck. This post will address the issue with a simple model and an example.

## Performance Model

When processing data collected over time, the time taken for a Hadoop job can be described as

t = ov + (ps + pa) * h

- ov = the fixed overhead in running a job
- ps = processing time for Hadoop system for an hour worth of data
- pa = processing time in map and reduce for an hour worth of data
- h = number of hours over which data is collected
- t = processing time

Unlike the post I mentioned, I have divided the dynamic part of the processing time into two parts, one part related to time spent in Hadoop system and the other in the application logic.

The post also talks about *stable processing time* which corresponds to* h* being equal to *t*. In this condition, the Hadoop job will be perpetually busy i.e., by the time a job has finished it’s ready for the next run. I am leaving the equation as is and not considering *stable processing time.*

There is a fixed overhead cost in running any Hadoop job, which is represented by *ov*. The overhead is mostly associated with the work involved in job submission handled by the jobtracker. For the most part, it’s independent of the data size.

The dynamic part of the time has two components. The first component represented by* ps*, contains all the time taken by Hadoop system for processing a job. This part is mostly disk and network IO bound. Bradly speaking it constitutes the following

- Input and output for map
- Shuffle
- Input and output for reduce

The second component represented by *pa* is the time taken by the application logic i.e. time spent in the map and reduce functions. This time is all CPU bound, unless map and reduce functions access some remote systems, which is not desirable. It contains the following

- Execution of map
- Execution of reduce

## Performance Tweaking

For a given cluster, the overhead is more or less fixed and not much can be done about it. Our focus is on the dynamic and data size dependent part of the time. We would like to investigate overall performance improvement when performance is improved for *ps* and *pa*.

Cloudera has a good post on performance tips. This Yahoo presentation in slideware also has some important performance tips. These performance tips have mostly to do with overhead and system performance time.

## System Processing Time

Let’s say we have improved *ps* by tweaking different Hadoop config paramenets by a factor of *n*. Then the the overall performance improvement factor is the ratio of the two time values as below

t2/t1 = (ov + (n * ps + pa) * h) / (ov + (ps + pa) * h)

After some juggling this yields

t2/t1 = n + (1 – n) * ((ov + pa * h) / (ov + ps * h + pa * h))

We would like to see the second part as small as possible, which will happen if *ps* dominates over *ov* and *pa*. When *ps* is large, the over all performance improvement will be close to performance improvement in *ps*.

## Application Processing Time

If we improve *pa* by a factor of *n* , by optimizing map and reduce code, the overall performance is as follows

t2/t1 = (ov + ( ps + n * pa) * h) / (ov + (ps + pa) * h)

which simplifies to

t2/t1 = n + (1 – n) * ((ov + ps * h) / (ov + ps * h + pa * h))

We will get the best overall performance improvement when *pa* dominates over* ov* and *ps*, which is typically not the case for most Hadoop jobs, unless you have very complex CPU intensive logic in your map and reduce functions.

## Pseudo Real Time Processing

Although Hadoop is a batch processing system, we can get some insight on how well it may work a real time processing. By equating t and h, we get the minimum possible latency for near real time processing by Hadoop. It’s as follows

t = ov / (1 – (pa + ps))

To take an example, let’s say* ov = 1 min, ps = 6 min and pa = 4 min*. To process an hour’s worth of data it takes 11 min in this example. By plugging in these values into the equation above we get *t = 72 sec*.

To get the minimum possible latency for this example , we have to run Hadoop every *72 sec* and it will process data collected over the last *72 sec*. So the latency will be between *72 se*c and *144 sec*.

In real life it’s not advisable to run Hadoop for small datasets, unless the data collection rate is so high that there is significant amount of data collected in a 72 sec window.

## Example

We will use this example to show some numbers for overall performance improvement.

n = 0.8 and ov = 0.05 hours and ps = 0.2 hours

The overhead time is 0.05 hours, the system processing time (*ps*) is 0.2 hours. We will consider 20% improvement in* ps* and *pa*. We will consider two scenarios. In one case, application peocessing time (*pa*)is 40% of *ps* and in the other scenario *pa* is 20% of *ps*. We are considering a typical IO bound Hadoop job.

We will apply the improvement factor (n) to* ps* and then to* pa* and see the overall improvements for the different scenarios. When *n* = 0.8 i.e., 20 % improvements in *ps* or *pa*, the results for overall improvement is as follows from the equations above.

pa=0.4ps | pa=0.2ps | |

improve ps | 12.2 | 12.7 |

improve pa | 5.0 | 3.0 |

For our specific example, as expected, improving *ps* has much bigger impact on the overall performance improvement.

## Measuring Processing Time

To apply the equations to investigate performance we need *ov*, *ps* and *ps* for a job. The processing time is linear function of number of hours. The intercept is the overhead (*ov)* and the slope is the total processing time per hour of data (*ps* +* pa*).

The intercept and slope can be found easily by having two data points. We could run the Hadoop job for 1 hour worth of data and 4 hours worth of data and measure total elapsed time for both jobs ( t1 and t4). The results are as follows

ov = (4 * t1 – t4) / 3 and (ps +pa) = (t4 – t1)/3

We have the total hourly processing time. How do we get the individual values. One way is to use Hadoop counter to get the elapsed time for all map and reduce executions, which will essentially give us *pa*. The remaining part is* ps*.

Since Hadoop cluster conditions may vary depending on the number of jobs running, among other things, it is important to make multiple runs at different times and take the average for t1 and t4.

## Conclusion

By taking measurements we can found out if a Hadoop job is IO bound (more time spent in system processing) or CPU bound (more time spent in application processing). Depending on the case, we can try to improve *ps* or *pa* and investigate overall performance improvement.

Hadoop Vaidya is a good practical tool to find performance issues with MR job based on job history analysis. I plan to contribute more rules soon

Thanks for detailed article on Hadoop. hadoop can also be used olap and oltp processing.

Please click Why Hadoop is introduced to know more on Basics of Hadoop

Pingback: Big Data Caught in Storm | Mawazo

Pingback: Simple Hadoop Performance Model | BigDataCloud.com

Thanks for the quick analysis and description. I however want to know how do you get the ‘ps’ time for I/O and network? I can only see “CPU time spent (ms)” reported by hadoop. Thanks,

Here is one way. Profile time spent in your map and reduce application logic. Emit that as Hadoop counters. This will be pa. If you subtract that from the total elapsed time for your Hadoop job, that is the time time used by Hadoop framework . That will be ps.