Folding, Cross Validation with Map Reduce

In my current web log data mining project using Hadoop, I am trying to build a predictive model for predicting certain attributes of the web site visitor.  I have number of ETL (Extract, Transform and Load)tasks to prepare the data before  running the data mining algorithm. These tasks have been implemented as a series of MR (Map Reduce) jobs. One of those tasks is what is known is folding, in data mining parlance. In this post, I will go over how I have implemented this as an MR job

In  data mining, the available data set is split into two sets, one used for training the predictive model and the other set is for validating the trained model derived from the training data set.  Typically the way it’s done is to divide the data set into n number of sub sets. Out of those n sub sets, n-1 of them are used for training and the remaining  sub set is used for validation. You may have multiple model training  iterations and in each iteration, you use a different n-1 subsets for training. You end up with an ensemble of models, one from each training iteration.  You can keep them all for prediction purpose. Or you may keep only those that meet some predefined validation criteria.

For example, if there are  100000 rows of data,  rows 0 – 89999 get used for for training and rows 90000 – 99999 for validation. In the next iteration, rows 0 – 79999 and 90000 – 99999 get used for training and rows 80000 – 89999 get used for validation and so forth.

Since MR is stateless functional programming model, implementing folding, the way I described above is problematic. Moreover I have to have multiple MR jobs, including one to count the number of rows. So I decided to take a simpler approach which will partition the data into 2 sets, given the percentage of  rows that need to be in the validation set.  I also want the training and validation data to be written into 2 separate files.

In my map task, I  have some simple sampling logic using random number generator , based on which a row is emitted either under the key “training” or “validation”.

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  int sample = random.nextInt(100);
  if (sample < validationSetSize){
  } else {
  context.write(keyOut, value);

The parameter  validationSetSize is the percentage of rows that should be in validation set and it’s provided through a custom Hadoop property .  A typical value might be 10.  This value is read from the context object and saved in the setup method of my map task as below

protected void setup(Context context) throws IOException, InterruptedException {
  validationSetSize = context.getConfiguration().getInt("validationSetSize", 10);

The reducer method is as follows

protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
  for (Text value :  values){
    multiOuputs.write(NullWritable.get(),  value);

My reducer simply emits the value with null key, so that in the output file I will have my rows from the input file as is.

It may sound shocking, but so far I have not accomplished anything.  The output of my map reduce job will simply be a copy of my input file. Although my map task partitioned the data under 2 separate keys, once processed through the reducer, I will have only one output file. But I wanted the training and validation data set to be written out to two separate files.

By default,  Hadoop creates only one reducer task which will process all map output, which does not work for us. Fortunately, we can set the number of reducer tasks as below, in the driver for the MR job. We need to  set the value  to two, one for the training set and the other for the validation set.


Are we done yet? Not so fast. In MR  the map output is partitioned and each partition is sent to separate reducer task instance. There are as many partitions as the number of reducer tasks.

The default  partitioning logic  in Hadoop is something like as follows

hash(key) % numReducer

Since we have only two key values, there is no guarantee that they will not fall into the same partition. If that happens, we will have all map output going to the same partition and the reducer. As a result we will have only one output. Moreover one reducer will be idle. I would call that load unbalancing.

What we would like is for the two keys to map to two separate partitions. For that to happen, we need to write a custom  partitioner with a getPartition method as  below

public int getPartition(Text  key, Text value, int  numPart) {
  int part = key.toString().equals("training") ? 0 : 1;
  return part;

We simply force all map output with key “training” to be in  one partition and the map out with key “validation” to be in the other partition. The two reducers tasks will process two separate sets of data and each will produce it’s own output file.

So we end up with two output files, part-r-0000 and part-r-0001, one containing the “training” data set and the other the “validation” data set. How do we  know which contains what. Well, if we had chosen a value of 10 for validationSetSize, then one file will be roughly 9 times larger that the other.  So we know that the larger file contains the “training” data set.

It would have been nice if we could name the output files. With Hadoop that’s possible. I will cover that in a separate post.

Lastly, the data will not be exactly partitioned as per the value of validationSetSize, because of the sampling we are doing in the map method based on a random number of generator.  With a value of 10 for validationSetSize, the “training” data set may not contain exactly 90% of the data. But the small variation is inconsequential.

We still have a load balancing issue for the reducers. With 10% validation data, one of the reducer tasks will process 90% of the data. One way to address that is to improve the getPartition logic by having multiple partitions for the “training” data, instead of one as we have it.

Map tasks will be load balanced by Haddop. Typically there will be as many map tasks s the number of HDFS blocks your input has.

That’s it for today. Hope you have enjoyed my post. Comments are welcome



About Pranab

I am Pranab Ghosh, a software professional in the San Francisco Bay area. I manipulate bits and bytes for the good of living beings and the planet. I have worked with myriad of technologies and platforms in various business domains for early stage startups, large corporations and anything in between. I am an active blogger and open source project owner. I am passionate about technology and green and sustainable living. My technical interest areas are Big Data, Distributed Processing, NOSQL databases, Machine Learning and Programming languages. I am fascinated by problems that don't have neat closed form solution.
This entry was posted in Hadoop and Map Reduce and tagged , , , . Bookmark the permalink.

3 Responses to Folding, Cross Validation with Map Reduce

  1. Dipankar says:

    Nice and crisp!

  2. james says:

    This isn’t truly cross-validation.
    Cross-validation prevents a data point from appearing in all training folds i.e. used for training at most k-1 time.
    Similarly, it ensures it appears in one (and only one) validation fold i.e. each data point is used once for validation.
    Random sampling ensures neither of the two.

    • Pranab says:

      I described the formal cross validation process in the beginning of my post, which agrees with your comment. As I mentioned in the post, I have taken a simpler approach and and deviated from the formal definition. This was done deliberately.

Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s