Map Reduce Secondary Sort Does It All


I came across a question in Stack Overflow recently related to calculating a web chat room statistics using Hadoop Map Reduce. The answer to the question was begging for a solution based map reduce secondary sort. I will provide  details, along with code snippet,  to complement my answer to the question.

The Problem

The data consists of a time stamp, chat room zone and number of users. The data is logged once per minute.  The goal is to find maximum number of users per day for all zones. The data looks like as below

timestamp; zone; value
2011-01-01 00:00:00; 1; 10
2011-01-01 00:00:00; 2; 22
2011-01-01 00:01:00; 1; 11

The Solution

The brute force solution is for the map to emit zone and day as the key and the number of users as the value. The reducer will receiver all the user counts and it will find the maximum iterating through the value list. But we can do better than that, thanks to Hadoop secondary sort. To reiterate my answer in stack overflow, the solution based on secondary sort, consists of the following steps.

  1. Define the key as concatenation of zone, timestamp and the value i.e, zone:yyyy-mm-dd:value You don’t even need to emit any value from the mapper. NullWritable is good enough for the value.
  2. Implement key comparator such that zone:yyyy-mm-dd part of the key is ordered ascending and the value part is ordered descending. This will ensure that for all keys for given zone:yyyy-mm-dd, the first key in the group will have the highest value.
  3. Define partitioner and grouping comparator of the composite key based on the zone and day part of the key only i.e. zone:yyyy-mm-dd.
  4. The reducer input will contain the first key for a key group, which will contain zone, day and the max value for that zone, day combination. The value part of the reducer input will be a list of NullWritable, which can be ignored.

Implementation

The key emitted by the mapper is a composite key. The zone and day part of the key is the natural key. The natural key is used by the partitioner and the group comparator. When we tag a value to the natural key, we get a composite key, which gives us sorting ability for values, because the value become part of the key sorting

Chat zone Date(Year,month and day) User count(per minute)

The mapper  emits the key as defined in step1 and NullWritable as values. The mapper output will look like this.

2:2011-01-01:23 -> null
2:2011-01-05:17 -> null

The key comparator implementation as defined in step 2 is as follows. The composite key is used by the key comparator.


public static class KeyComprator extends WritableComparator {

 protected KeyComprator() {
 super(Text.class, true);
 }

 @Override
 public int compare(WritableComparable w1, WritableComparable w2) {

//ascending zone and day

Text t1 = (Text) w1;
 Text t2 = (Text) w2;
 String[] t1Items = t1.toString().split(":");
 String[] t2Items = t2.toString().split(":");
 String t1Base = t1Items[0] + ":" + t1Items[1] + ":";
 String t2Base = t2Items[0] + ":" + t2Items[1] + ":";
 int comp = t1Base.compareTo(t2Base);

//descending value
 if (comp == 0) {
 comp = t2Items[2].compareTo(t1Items[2]);
 }

 return comp;

 }
 }

The group comparator as mentioned in step 3, is needed because we want all the keys with the same zone and day to be grouped together. Each invocation of the the reducer will be for one group of keys. The group comparator implementation is as follows


public static class GroupComprator extends WritableComparator {

 protected GroupComprator() {
 super(Text.class, true);
 }

 @Override
 public int compare(WritableComparable w1, WritableComparable w2) {

//consider only zone and day part of the key
 Text t1 = (Text) w1;
 Text t2 = (Text) w2;
 String[] t1Items = t1.toString().split(":");
 String[] t2Items = t2.toString().split(":");
 String t1Base = t1Items[0] + ":" + t1Items[1] + ":";
 String t2Base = t2Items[0] + ":" + t2Items[1] + ":";
 int comp = t1Base.compareTo(t2Base);

 return comp;

 }
 }

By default, each key maps to a separate group. Our custom group comparator, maps multiple keys to one group.  A reducer gets called once per group. When multiple keys are mapped to one group, Hadoop sends the first key of the group to the reducer. Since, in our case the keys within a group are sorted, we get the desired key in the reducer input.

In a real Hadoop cluster, there are many reducers running in different nodes. If the data for the same zone and day don’t land in the same reducer after the map reduce shuffle, we are in trouble. The way to ensure that is taking charge of defining our own partitioning logic. The implementation is as follows


public static class ZoneDayPartitioner extends Partitioner  implements Configurable{
 @Override
 public int getPartition(Text key, Text NullWritable, int numPartitions) {
//consider only zone and day part of the key
 String[] keyItems = key.toString().split(":");
 String keyBase = keyItems[0] + ":" + keyItems[1] + ":";
 int part  = keyBase.hashCode() % numPartitions;
 return part;
 }

All the reducer will have to is to simply emit key. The reducer output will consist of zone, day and corresponding maximum visitor for that zone and day. The zone and day will be sorted ascending for a particular reducer output. But they won’t be globally sorted across all reducer outputs

Wrapping up

We could get away with emitting null for value in mapper, because we were only interested in the maximum number of users for a day and we essentially carried that value in the key.

If however, we were interested in finding top two number of users  for a day, then from the mapper, we had to emit the number of users as the value, in addition to carrying it with the key. The reducer logic also would have been different. We had to iterate through the values and retain the first two value from the list. You might find this post on map reduce secondary sort interesting read.

I have provided only the key aspects of the implementation. it should be easy to fill in the gaps for the full implementation. As usual, comments are welcome.

For commercial support for any solution in my github repositories, please talk to ThirdEye Data Science Services. Support is available for Hadoop or Spark deployment on cloud including installation, configuration and testing,

About Pranab

I am Pranab Ghosh, a software professional in the San Francisco Bay area. I manipulate bits and bytes for the good of living beings and the planet. I have worked with myriad of technologies and platforms in various business domains for early stage startups, large corporations and anything in between. I am an active blogger and open source project owner. I am passionate about technology and green and sustainable living. My technical interest areas are Big Data, Distributed Processing, NOSQL databases, Machine Learning and Programming languages. I am fascinated by problems that don't have neat closed form solution.
This entry was posted in Hadoop and Map Reduce, Java and tagged . Bookmark the permalink.

30 Responses to Map Reduce Secondary Sort Does It All

  1. Kondala says:

    For the case of taking the max number (i.e. top 1), it’s not clear if you are using a combiner or not. Given that max() is both associative and commutative, your reducer and combiner could be the same and it tremendously reduces the volume of data that gets shuffled.

  2. pkghosh says:

    Kondala,

    I agree. A combiner could cut down IO in shuffle stage. I wrote this as a quick prototype and hence skipped the combiner.

    Pranab

  3. kkrugler says:

    You might be interested in reading more about Cascading. It basically does what you describe above, in a dynamic way – so your Cascading code would look something like:

    pipe = new Each(pipe, new RegexParser(pattern));
    pipe = new GroupBy(pipe, new Fields(“zone”, “timestamp”), new Fields(“value”));
    pipe = new Every(pipe, new First());

    That gets compiled into code that configures the Cascading comparator and partitioner to act as needed.

    • Manoj says:

      Hi KKrugler – How to retain other fields that not in cascading group by. Example consider i am having fields “zone”, “timestamp”,”ip” and grouping on fields “zone”, “timestamp” then how to get the “ip” field?

      • kkrugler says:

        I think you also asked this question on the Cascading list, which I answered there. But the short answer is to split the pipe upstream, then re-combine with the results of finding the max (or count, as per your question). If you know that the max size of any group is small enough, then you can make it more efficient by writing a custom Buffer operation that keeps the set of records (tuples) in memory while it calculates the result, and then emits them with the result as an additional field.

  4. pkghosh says:

    kkrugler:

    Thanks for your comments. I am aware of Cascading and like it’s abstraction of Hadoop data processing flows. However, my intent of the post was to show the nuances of Hadoop secondary sort and I think java map reduce is the best way to do that. Hiding them under the abstraction of Cascading does not help.

    Pranab

  5. Pingback: Similarity Based Recommendation – Hadoop Way | Mawazo

  6. Pingback: Hadoop by cyavvari - Pearltrees

  7. Brian Hether says:

    Prabab, thanks for a great write-up. i followed your instructions and was able to put together a full mapreduce application that secondary sorts on newspaper article name (a unique string) using IP address as the first natural part of the key. By not using the Group Comparator class i’m able to get a list of IP addresses (sorted lexigraphically but not unique) followed by a unique list of all newspaper articles that IP address page viewed on. I can now use that output as input to Mahout for a recommendation engine project. Thank you!!! i was stuck on how to do the seconday sort with text objects.

  8. Pranab says:

    Brian

    That sounds great. If you used group comparator, it would have grouped all the article names under the IP address as the base part of the key. I am curious as to why you need article names to be sorted. Mahout collaborative filtering just requires user, item pairs. BTW, you may be interested in looking at my implementation collaborative filtering based recommendation engine on github, https://github.com/pranab/sifarish

    Another suggestion is to track the number of times the same article got viewed from the same IP address. That would have served as an implicit rating in the recommendation engine input.

    • Brian Hether says:

      i’m sorting the articles just to make sure i can do it correctly but mahout has no need for it as you say. as part of this exercise i was trying to duplicate some Hive SQL queries we were running in hadoop. i do track the number of times the same article gets ‘paged’ by the same IP address and created a scale to force the preference into a 1, 2 or 3. I’m not sure it needs to be forced into a short scale like that though, what do you think? the pageviews range from 1 to around 30 for some long articles. also, i’m using item similarity since our item lists are much, much smaller than our user lists so sparcity is less of an issue. i’ll check out your recommendation engine Pranab, thanks for pointing it out.

      i have a question for you. why do some secondary sort examples use IntPairs or TextPairs for composite keys when all that is needed is your approach of just creating a concatanated key in the map step? i haven’t yet expiremented with joins (that is next up) so maybe it is needed for that. i’m fairly new at some of this so you can guess some things are foggy still.

  9. Pranab says:

    Brian
    You could scale the page view numbers down using a sigmoid function. For example, (10 / (1 + exp(-bx))) – 4 will restrict it to the range 1 .. 6. Although using Text as in my example works, it may not be the most efficient for storage and network transport. It’s preferable to use the native types and that’s what I generally do. I have been using a Tuple class that I created. It’s list of any java native types.

  10. Pumuckl says:

    Thanks for this post, very helpful!!!
    I want to mention, though, that Java’s standard String.hashCode() function not necessarily returns a positive integer, and further Java’s modulus definition does not guarantee a positive remainder either (yes, I was surprised by this as well); thus the calculation in the partitioner [int part = keyBase.hashCode() % numPartitions;
    ] can lead to assigning keys to negative partitioners, which will not work.
    Do a Math.abs() or just add NumPartitions when part<0

  11. Pranab says:

    Pumuckl

    Thanks. Very good point. I found out the hard way, the other day in one of my map reduce projects. I should go through all my MR projects and fix this

  12. Sebin John says:

    Coluld you please provide me the full source code of this program.?

    • Pranab says:

      Sebin

      Unfortunately, I dont have the source code. Please take a look at this MR code from my OS project visitante for web click stream analytic. https://github.com/pranab/visitante and look for the java class SessionExtractor

      It does something smilar, grouping web click stream data by sessionID and secondary sorting by visit time for a page. Between this and the code snippet here in this blog, you should be all set.

  13. ramesh.narasingu says:

    Hi Pranab,
    This is Ramesh.Narasingu.,I have newly learning Big Data Analysis. I had learn Hadoop HDFS and MAP Reduce concepts. But, i dont have any practical oriented analysis pls give me full practical oriented map reduce. i want how to execute in cloudera_training centOS commands pls give me reply as early as possible

  14. Dima says:

    Finally found an explanation what the group comparator does. Thanks.

  15. Aseem says:

    A rather informative post. But I am facing an issue. I followed the steps you have mentioned but my Map-Reduce program seems to be not entering the the Reduce function. The output is getting dumped to the hdfs after map stage. I have made sure that the job.SetReducerClass() variable is set properly. Could you comment on what could the problem be?

  16. Pingback: What is the point of using a Partitioner for Secondary Sorting in MapReduce?CopyQuery CopyQuery | Question & Answer Tool for your Technical Queries,CopyQuery, ejjuit, query, copyquery, copyquery.com, android doubt, ios question, sql query, sqlite quer

  17. venkata says:

    Hi Pranad,

    Thanks for detailed explanation on each customer implementations. I recently starting learning Big Data concepts and having confusion on shuffle/sort and partinioner concepts. If you get a chance could you please explain me what exactly shuffle does?

  18. Pranab says:

    Venkata
    It’s hard to explain in few sentences. This page from yahoo Hadoop tutorial should help
    http://developer.yahoo.com/hadoop/tutorial/module4.html
    Pranab

  19. venkata says:

    Thanks Pranab.

    I gone through above example and implemented in a Brute force method since I want to get the output as on which day and zone ,maximum number of users logged.That is

    Input
    Zone Day users
    1 01-15-2014 5
    1 01-16-2014 `10
    2 01-18-2014 25
    2 01-19-2014 9

    Reducer Output:
    Zone Day users
    1 01-16-2014 10
    2 01-18-2014 25.

    I am able to get the maximum number of user for each day and zone not a specific day and zone.
    How can I control reducers keys in reduce method?

    Thanks
    Srujana.

  20. i have learn that the your reviews.thanks a lot.

    Hadoop Training in Chennai

    Thanks
    vignesh

  21. Pingback: Diversity in Personalization with Attribute Diffusion | Mawazo

  22. Ankit Sharma says:

    Apache Hadoop® is an open source framework for distributed storage and processing of large sets of data on commodity hardware. Hadoop enables businesses to quickly gain insight from massive amounts of structured and unstructured data.

    Start learning Apache Hadoop® from basics to advance levels here…
    https://goo.gl/r7g0Dt

Leave a reply to Pranab Cancel reply