Multi Cluster Hadoop Job Monitoring


I spend lot of time tracking and monitoring Hadoop jobs running across multiple clusters in my current project. Typically I navigate around multiple Job tracker web admin consoles.  Although the job tracker web console gives some basic system level statuses and metrics for Hadoop daemons, it leaves a lot to be desired. What’s missing is a monitoring platform at the application level.

In Hadoop, the only way to publish the internal health of an application is through so called job counters. A job counter belongs to a job counter group and has a value.

In my Hadoop job I may have a counter, that gets incremented when certain kind of exception gets thrown. I may want to see the counter value in a dashboard and also an alarm to be raised, when the value exceeds a threshold. Some other uses for counters for error conditions are invalid data and missing data.

Another serious limitations of the job tracker console is the maximum number of jobs that are retained. Older jobs simply disappear from the list of jobs in the job tracker console.

My Wish List

If I have to prepare a list of features I would like to see in a Hadoop application monitoring, it will be as follows. The focus is on application level monitoring across multiple clusters.

  1. Consolidated view of Hadoop job status and metrics across clusters.
  2. Retain history of all jobs run in all clusters.
  3. Be able to search jobs based on the job name or notes field.
  4. Be able to generate alerts for different conditions e.g., when a job has run longer than a pre defined threshold
  5. Consolidated view of all job counters from all jobs running in all clusters.
  6. Be able to generate alerts, when the value of certain counters has exceeded a threshold

Monitoring Eco System

I set out to build a set of simple tools for an application monitoring ecosystem and make them available in open source. The first component  is an monitoring agent, called estado, that collects data on job status and counters. There will be one instance of this agent  per cluster and will typically be setup as cron job to run periodically. It could run once every few minutes.

The second component is a database backed web application, called starea. The database aggregates data arriving from all the agents  running in the Hadoop clusters. The third component is Hadoop workflow engine called fluxua, that I blogged about earlier. It’s hosted in GitHub. The workflow engine manages  some job related meta data. The workflow engine  will save the meta data in the database of starea, after a job has run.

Collection Agent

The focus of the post is the data collection agent, estado. It has a very simple pluggable architecture and consists of only a handful of java classes . It used Hadoop JobClient API and job status shell command to get job statuses and job counters.

Job statuses and counters may be consumed by different   kinds of consumers. Out of the box, estado provides a console consumer and and RDBMS consumer.  Here is a sample configuration file showing a console consumer and a database consumer.

cluster.name=default
status.filter=srfpk
consumer.console.class=org.estado.spi.JobStatusConsoleConsumer
consumer.rdbms.class=org.estado.spi.JobStatusRdbmsConsumer
consumer.rdbms.url=jdbc:mysql://localhost/starea?user=admin&password=welcome

Since job status across multiple clusters are aggregated with estado  running in different clusters, the configuration has a cluster name.  The filter parameter allows jobs to filtered by status. The value srfpk corresponds to succeeded, running, failed, killed and preparing jobs respectively.  If someone is interested only in monitoring running jobs, the value will be set to r.

Multiple consumers may be configured. Each consumer has name e.g., rdbms. A java class name must be specified for any consumer. The class needs to implement the interface JobStatusConsumer. Additional parameters needed by a consumer may be specified in the configuration. The consumer class should have the corresponding java bean properties, when additional parameters are needed. For the RDBMS consumer, a JDBC URL is the additional parameter.

Additional consumers may easily be implemented and plugged in,using the plugin framework  to send job status and metrics data over to other applications or repositories.

Hadoop provides a Ganglia context for collecting metrics for Job Tracker and HDFS daemons. A Ganglia consumer could easily be implemented for estado. Someone may be interested in collecting data in a NOSQL database like Cassandra.

In addition to collecting data through Hadoop API and job status command, if certain meta data about a job is provided by the job implementation through job counters , they get collected. All the job meta data need to be published under the counter group JobInfo. Currently,  it supports the counters jobName and notes.  As the the name suggests, the jobName is the name of the job and notes is any descriptive text about the job.

Here is the kernel for the collection agent, which is the method checkStatus() in the class JobStatusChecker. It collects the job status and counters and calls all the configured consumers. Full source code for estado is hosted on GitHub. It’s fully functional.


public void checkStatus(){
ListJobStatus> jobStatusList = new ArrayList();

try {
Configuration conf = new Configuration();
JobClient client = new JobClient(new JobConf(conf));
JobStatus[] jobStatuses = client.getAllJobs();
showFilter();

int jobCount = 0;
for (JobStatus jobStatus : jobStatuses) {
Long lastTaskEndTime = 0L;
TaskReport[] mapReports = client.getMapTaskReports(jobStatus.getJobID());
for (TaskReport r : mapReports) {
if (lastTaskEndTime < r.getFinishTime()) {
lastTaskEndTime = r.getFinishTime();
}
}
TaskReport[] reduceReports = client.getReduceTaskReports(jobStatus.getJobID());
for (TaskReport r : reduceReports) {
if (lastTaskEndTime < r.getFinishTime()) {
lastTaskEndTime = r.getFinishTime();
}
}
client.getSetupTaskReports(jobStatus.getJobID());
client.getCleanupTaskReports(jobStatus.getJobID());

String jobId = jobStatus.getJobID().toString();
Long startTime = jobStatus.getStartTime();
String user = jobStatus.getUsername();
int mapProgress = (int)(jobStatus.mapProgress() * 100);
int reduceProgress = (int)(jobStatus.reduceProgress() * 100);
org.estado.spi.JobStatus jobStat = null;
++jobCount;

int runState = jobStatus.getRunState();
switch(runState){
case JobStatus.SUCCEEDED:
if (filter.contains("s")){
Long duration = lastTaskEndTime - jobStatus.getStartTime();
jobStat = new org.estado.spi.JobStatus(cluster, jobId, null, null, user, startTime, lastTaskEndTime,
duration, mapProgress, reduceProgress, "completed");
++sCount;
}
break;

case JobStatus.RUNNING:
if (filter.contains("r")){
long duration = System.currentTimeMillis() - jobStatus.getStartTime();
jobStat = new org.estado.spi.JobStatus(cluster, jobId, null, null, user, startTime,
lastTaskEndTime,  duration, mapProgress, reduceProgress,"running");
++rCount;
}
break;

case JobStatus.FAILED:
if (filter.contains("f")){
long duration = lastTaskEndTime - jobStatus.getStartTime();
jobStat = new org.estado.spi.JobStatus(cluster, jobId, null, null, user, startTime,
lastTaskEndTime,  duration, mapProgress, reduceProgress, "failed");
++fCount;
}
break;

case JobStatus.PREP:
if (filter.contains("p")){
jobStat = new org.estado.spi.JobStatus(cluster, jobId, null, null, user, null,
null,  null, 0, 0, "preparing");
++pCount;
}
break;

case JobStatus.KILLED:
if (filter.contains("k")){
long duration = lastTaskEndTime - jobStatus.getStartTime();
jobStat = new org.estado.spi.JobStatus(cluster, jobId, null, null, user, startTime,
lastTaskEndTime,  duration, mapProgress, reduceProgress, "killed");
++kCount;
}
break;
}

jobStatusList.add(jobStat);
}

//get counters
for (org.estado.spi.JobStatus jobStat : jobStatusList){
if (!jobStat.getStatus().equals("preparing")){
List  counterGroups = getJobCounters(jobStat.getJobId());
jobStat.setCounterGroups(counterGroups);

//additional data from counters
setJobInfo(jobStat);
}
}

//publish to all consumers
for (JobStatusConsumer consumer : consumers){
consumer.handle(jobStatusList);
}

showJobCounts();
} catch (Exception ex) {
System.out.println("Jobs status checker failed" + ex.getMessage());
}

}

Database Schema

The database schema used by the JobStatusRdbmsConsumer is as follows. It uses two tables: jobs and metrics.

jobs:
+-----------------+--------------+------+-----+---------+----------------+
| Field           | Type         | Null | Key | Default | Extra          |
+-----------------+--------------+------+-----+---------+----------------+
| id              | int(11)      | NO   | PRI | NULL    | auto_increment |
| jobid           | varchar(255) | YES  |     | NULL    |                |
| cluster         | varchar(255) | YES  |     | NULL    |                |
| user            | varchar(255) | YES  |     | NULL    |                |
| start_time      | datetime     | YES  |     | NULL    |                |
| end_time        | datetime     | YES  |     | NULL    |                |
| duration        | int(11)      | YES  |     | NULL    |                |
| name            | varchar(255) | YES  |     | NULL    |                |
| status          | varchar(255) | YES  |     | NULL    |                |
| notes           | varchar(255) | YES  |     | NULL    |                |
| estimated_time  | int(11)      | YES  |     | NULL    |                |
| job_type_id     | int(11)      | YES  |     | NULL    |                |
| flow_id         | int(11)      | YES  |     | NULL    |                |
| created_at      | datetime     | YES  |     | NULL    |                |
| updated_at      | datetime     | YES  |     | NULL    |                |
| map_progress    | int(11)      | YES  |     | NULL    |                |
| reduce_progress | int(11)      | YES  |     | NULL    |                |
+-----------------+--------------+------+-----+---------+----------------+

metrics:
+------------+--------------+------+-----+---------+----------------+
| Field      | Type         | Null | Key | Default | Extra          |
+------------+--------------+------+-----+---------+----------------+
| id         | int(11)      | NO   | PRI | NULL    | auto_increment |
| context    | varchar(255) | YES  |     | NULL    |                |
| category   | varchar(255) | YES  |     | NULL    |                |
| name       | varchar(255) | YES  |     | NULL    |                |
| value      | int(11)      | YES  |     | NULL    |                |
| job_id     | int(11)      | YES  |     | NULL    |                |
| created_at | datetime     | YES  |     | NULL    |                |
| updated_at | datetime     | YES  |     | NULL    |                |
+------------+--------------+------+-----+---------+----------------+

Sample Output

Here is the sample output from JobStatusConsoleConsumer.

Cluster:default
Job Id:job_201107171959_0001
User:pranab
Start time:2011-07-17 08:04:03
End time:2011-07-17 08:06:20
Duration:02min:17sec:230ms
Map progress:100
Reduce progress:100
Status:completed

Counter group:Job Counters
	Launched reduce tasks=1
	SLOTS_MILLIS_MAPS=81621
	Total time spent by all reduces waiting after reserving slots (ms)=0
	Total time spent by all maps waiting after reserving slots (ms)=0
	Launched map tasks=1
	Data-local map tasks=1
	SLOTS_MILLIS_REDUCES=52691
Counter group:FileSystemCounters
	FILE_BYTES_READ=349
	HDFS_BYTES_READ=25157
	FILE_BYTES_WRITTEN=115394
	HDFS_BYTES_WRITTEN=331
Counter group:Map-Reduce Framework
	Reduce input groups=2
	Combine output records=2
	Map input records=1000
	Reduce shuffle bytes=349
	Reduce output records=2
	Spilled Records=4
	Map output bytes=19776
	Combine input records=2000
	Map output records=2000
	SPLIT_RAW_BYTES=111
	Reduce input records=2
Counter group:JobInfo
        jobName:log processing 2011-07-22-14=6
        notes:any comment=6

The counter group JobInfo can be used by the map reduce job implementor to  publish meta data about jobs. In, this case it publishes job name and and some descriptive text about the job. Unfortunately, Hadoop JobStatus class does not provide the job name and this the only sneaky way to get it out. Only if these counters are available, the job name and notes can be extracted and passed to the consumers plugin.

Going Forward

I am also working on a Rails based web application, called starea, around the database tables populated by JobStatusRdbmsConsumer. It provides a consolidated view of job status and counters. I have plans to add some search functions.

I may also provide a way to define alert conditions based on job counter values and shown those  counters  in a clearly distinguishable way through color and styling when the alert conditions are met.

Another nice feature will be to expose the data in starea through JMX for integration with other application monitoring systems.

Advertisements

About Pranab

I am Pranab Ghosh, a software professional in the San Francisco Bay area. I manipulate bits and bytes for the good of living beings and the planet. I have worked with myriad of technologies and platforms in various business domains for early stage startups, large corporations and anything in between. I am an active blogger and open source project owner. I am passionate about technology and green and sustainable living. My technical interest areas are Big Data, Distributed Processing, NOSQL databases, Machine Learning and Programming languages. I am fascinated by problems that don't have neat closed form solution.
This entry was posted in Hadoop and Map Reduce, Java and tagged , . Bookmark the permalink.

6 Responses to Multi Cluster Hadoop Job Monitoring

  1. danoomistmatiste says:

    Very nice post. I think this level of process level monitoring is much needed for a large hadoop cluster. I have a couple of questions. We do have monitoring needs of this kind, however with every map and reduce task spawned by the Job tracker, we need to show cpu and memory usage as well. Does your app let you do that. Second question is, you mention estado so I presume this is similar to the ganglia gmetad. The collection agents are like gmond that will run on each node on the cluster. So how do you implement these collection agents.

  2. pkghosh says:

    danoomistmatiste:
    Thanks. Regarding your first question, a Hadoop Context class could easily be created to collect jvm metrics and populate the metrics table. The schema for the metrics table has been designed with jvm and other Hadoop daemon metrics in mind. This metrics context will have to be configured in each node of the Hadoop cluster.
    Reagrding the second question, the collection agent is already there as the JobStatusChecker class. Take a look at GitHub for the full source code. It comes with RDBMS consumer that writes data to an RDBMS. If your database schema is different or you want to use a NOSQL db as the repository, you need to write your own consumer class by implementing the interface JobStatusConsumer and configure it using the plugin mechanism.

    Hope your questions are answered.
    Pranab

  3. raulchen says:

    Counters can be retrieved in this way:
    RunningJob job=client.getJob(js.getJobID());
    Counters counters=job.getCounters();
    I think it’s better than shell command 🙂

  4. Pingback: Multi Cluster Hadoop Job Monitoring | BigDataCloud.com

  5. Pingback: Outils pour étalonner Apache Hadoop | Big Loupe

  6. the_outsider says:

    very interesting post, keep it going!

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s