I came across a question in Stack Overflow recently related to calculating a web chat room statistics using Hadoop Map Reduce. The answer to the question was begging for a solution based map reduce secondary sort. I will provide details, along with code snippet, to complement my answer to the question.
The Problem
The data consists of a time stamp, chat room zone and number of users. The data is logged once per minute. The goal is to find maximum number of users per day for all zones. The data looks like as below
timestamp; zone; value 2011-01-01 00:00:00; 1; 10 2011-01-01 00:00:00; 2; 22 2011-01-01 00:01:00; 1; 11
The Solution
The brute force solution is for the map to emit zone and day as the key and the number of users as the value. The reducer will receiver all the user counts and it will find the maximum iterating through the value list. But we can do better than that, thanks to Hadoop secondary sort. To reiterate my answer in stack overflow, the solution based on secondary sort, consists of the following steps.
- Define the key as concatenation of zone, timestamp and the value i.e, zone:yyyy-mm-dd:value You don’t even need to emit any value from the mapper. NullWritable is good enough for the value.
- Implement key comparator such that zone:yyyy-mm-dd part of the key is ordered ascending and the value part is ordered descending. This will ensure that for all keys for given zone:yyyy-mm-dd, the first key in the group will have the highest value.
- Define partitioner and grouping comparator of the composite key based on the zone and day part of the key only i.e. zone:yyyy-mm-dd.
- The reducer input will contain the first key for a key group, which will contain zone, day and the max value for that zone, day combination. The value part of the reducer input will be a list of NullWritable, which can be ignored.
Implementation
The key emitted by the mapper is a composite key. The zone and day part of the key is the natural key. The natural key is used by the partitioner and the group comparator. When we tag a value to the natural key, we get a composite key, which gives us sorting ability for values, because the value become part of the key sorting
Chat zone | Date(Year,month and day) | User count(per minute) |
The mapper emits the key as defined in step1 and NullWritable as values. The mapper output will look like this.
2:2011-01-01:23 -> null 2:2011-01-05:17 -> null
The key comparator implementation as defined in step 2 is as follows. The composite key is used by the key comparator.
public static class KeyComprator extends WritableComparator { protected KeyComprator() { super(Text.class, true); } @Override public int compare(WritableComparable w1, WritableComparable w2) { //ascending zone and day Text t1 = (Text) w1; Text t2 = (Text) w2; String[] t1Items = t1.toString().split(":"); String[] t2Items = t2.toString().split(":"); String t1Base = t1Items[0] + ":" + t1Items[1] + ":"; String t2Base = t2Items[0] + ":" + t2Items[1] + ":"; int comp = t1Base.compareTo(t2Base); //descending value if (comp == 0) { comp = t2Items[2].compareTo(t1Items[2]); } return comp; } }
The group comparator as mentioned in step 3, is needed because we want all the keys with the same zone and day to be grouped together. Each invocation of the the reducer will be for one group of keys. The group comparator implementation is as follows
public static class GroupComprator extends WritableComparator { protected GroupComprator() { super(Text.class, true); } @Override public int compare(WritableComparable w1, WritableComparable w2) { //consider only zone and day part of the key Text t1 = (Text) w1; Text t2 = (Text) w2; String[] t1Items = t1.toString().split(":"); String[] t2Items = t2.toString().split(":"); String t1Base = t1Items[0] + ":" + t1Items[1] + ":"; String t2Base = t2Items[0] + ":" + t2Items[1] + ":"; int comp = t1Base.compareTo(t2Base); return comp; } }
By default, each key maps to a separate group. Our custom group comparator, maps multiple keys to one group. A reducer gets called once per group. When multiple keys are mapped to one group, Hadoop sends the first key of the group to the reducer. Since, in our case the keys within a group are sorted, we get the desired key in the reducer input.
In a real Hadoop cluster, there are many reducers running in different nodes. If the data for the same zone and day don’t land in the same reducer after the map reduce shuffle, we are in trouble. The way to ensure that is taking charge of defining our own partitioning logic. The implementation is as follows
public static class ZoneDayPartitioner extends Partitioner implements Configurable{ @Override public int getPartition(Text key, Text NullWritable, int numPartitions) { //consider only zone and day part of the key String[] keyItems = key.toString().split(":"); String keyBase = keyItems[0] + ":" + keyItems[1] + ":"; int part = keyBase.hashCode() % numPartitions; return part; }
All the reducer will have to is to simply emit key. The reducer output will consist of zone, day and corresponding maximum visitor for that zone and day. The zone and day will be sorted ascending for a particular reducer output. But they won’t be globally sorted across all reducer outputs
Wrapping up
We could get away with emitting null for value in mapper, because we were only interested in the maximum number of users for a day and we essentially carried that value in the key.
If however, we were interested in finding top two number of users for a day, then from the mapper, we had to emit the number of users as the value, in addition to carrying it with the key. The reducer logic also would have been different. We had to iterate through the values and retain the first two value from the list. You might find this post on map reduce secondary sort interesting read.
I have provided only the key aspects of the implementation. it should be easy to fill in the gaps for the full implementation. As usual, comments are welcome.
For commercial support for any solution in my github repositories, please talk to ThirdEye Data Science Services. Support is available for Hadoop or Spark deployment on cloud including installation, configuration and testing,
For the case of taking the max number (i.e. top 1), it’s not clear if you are using a combiner or not. Given that max() is both associative and commutative, your reducer and combiner could be the same and it tremendously reduces the volume of data that gets shuffled.
Kondala,
I agree. A combiner could cut down IO in shuffle stage. I wrote this as a quick prototype and hence skipped the combiner.
Pranab
You might be interested in reading more about Cascading. It basically does what you describe above, in a dynamic way – so your Cascading code would look something like:
pipe = new Each(pipe, new RegexParser(pattern));
pipe = new GroupBy(pipe, new Fields(“zone”, “timestamp”), new Fields(“value”));
pipe = new Every(pipe, new First());
That gets compiled into code that configures the Cascading comparator and partitioner to act as needed.
Hi KKrugler – How to retain other fields that not in cascading group by. Example consider i am having fields “zone”, “timestamp”,”ip” and grouping on fields “zone”, “timestamp” then how to get the “ip” field?
I think you also asked this question on the Cascading list, which I answered there. But the short answer is to split the pipe upstream, then re-combine with the results of finding the max (or count, as per your question). If you know that the max size of any group is small enough, then you can make it more efficient by writing a custom Buffer operation that keeps the set of records (tuples) in memory while it calculates the result, and then emits them with the result as an additional field.
kkrugler:
Thanks for your comments. I am aware of Cascading and like it’s abstraction of Hadoop data processing flows. However, my intent of the post was to show the nuances of Hadoop secondary sort and I think java map reduce is the best way to do that. Hiding them under the abstraction of Cascading does not help.
Pranab
Pingback: Similarity Based Recommendation – Hadoop Way | Mawazo
Pingback: Hadoop by cyavvari - Pearltrees
Prabab, thanks for a great write-up. i followed your instructions and was able to put together a full mapreduce application that secondary sorts on newspaper article name (a unique string) using IP address as the first natural part of the key. By not using the Group Comparator class i’m able to get a list of IP addresses (sorted lexigraphically but not unique) followed by a unique list of all newspaper articles that IP address page viewed on. I can now use that output as input to Mahout for a recommendation engine project. Thank you!!! i was stuck on how to do the seconday sort with text objects.
Brian
That sounds great. If you used group comparator, it would have grouped all the article names under the IP address as the base part of the key. I am curious as to why you need article names to be sorted. Mahout collaborative filtering just requires user, item pairs. BTW, you may be interested in looking at my implementation collaborative filtering based recommendation engine on github, https://github.com/pranab/sifarish
Another suggestion is to track the number of times the same article got viewed from the same IP address. That would have served as an implicit rating in the recommendation engine input.
i’m sorting the articles just to make sure i can do it correctly but mahout has no need for it as you say. as part of this exercise i was trying to duplicate some Hive SQL queries we were running in hadoop. i do track the number of times the same article gets ‘paged’ by the same IP address and created a scale to force the preference into a 1, 2 or 3. I’m not sure it needs to be forced into a short scale like that though, what do you think? the pageviews range from 1 to around 30 for some long articles. also, i’m using item similarity since our item lists are much, much smaller than our user lists so sparcity is less of an issue. i’ll check out your recommendation engine Pranab, thanks for pointing it out.
i have a question for you. why do some secondary sort examples use IntPairs or TextPairs for composite keys when all that is needed is your approach of just creating a concatanated key in the map step? i haven’t yet expiremented with joins (that is next up) so maybe it is needed for that. i’m fairly new at some of this so you can guess some things are foggy still.
Brian
You could scale the page view numbers down using a sigmoid function. For example, (10 / (1 + exp(-bx))) – 4 will restrict it to the range 1 .. 6. Although using Text as in my example works, it may not be the most efficient for storage and network transport. It’s preferable to use the native types and that’s what I generally do. I have been using a Tuple class that I created. It’s list of any java native types.
Thanks for this post, very helpful!!!
I want to mention, though, that Java’s standard String.hashCode() function not necessarily returns a positive integer, and further Java’s modulus definition does not guarantee a positive remainder either (yes, I was surprised by this as well); thus the calculation in the partitioner [int part = keyBase.hashCode() % numPartitions;
] can lead to assigning keys to negative partitioners, which will not work.
Do a Math.abs() or just add NumPartitions when part<0
Pumuckl
Thanks. Very good point. I found out the hard way, the other day in one of my map reduce projects. I should go through all my MR projects and fix this
Coluld you please provide me the full source code of this program.?
Sebin
Unfortunately, I dont have the source code. Please take a look at this MR code from my OS project visitante for web click stream analytic. https://github.com/pranab/visitante and look for the java class SessionExtractor
It does something smilar, grouping web click stream data by sessionID and secondary sorting by visit time for a page. Between this and the code snippet here in this blog, you should be all set.
Hi Pranab,
This is Ramesh.Narasingu.,I have newly learning Big Data Analysis. I had learn Hadoop HDFS and MAP Reduce concepts. But, i dont have any practical oriented analysis pls give me full practical oriented map reduce. i want how to execute in cloudera_training centOS commands pls give me reply as early as possible
Ramesh,
I have several open source Hadoop projects on github. Here is the link
https://github.com/pranab
Thanks Pranab
Finally found an explanation what the group comparator does. Thanks.
Dina,
I am glad you found it. Good luck with secondary sorting
A rather informative post. But I am facing an issue. I followed the steps you have mentioned but my Map-Reduce program seems to be not entering the the Reduce function. The output is getting dumped to the hdfs after map stage. I have made sure that the job.SetReducerClass() variable is set properly. Could you comment on what could the problem be?
Aseem
You will find plenty of examples in any of my Hadoop open source projects listed bellow
https://github.com/pranab/sifarish
https://github.com/pranab/beymani
https://github.com/pranab/visitante
Good luck
Pranab
Pingback: What is the point of using a Partitioner for Secondary Sorting in MapReduce?CopyQuery CopyQuery | Question & Answer Tool for your Technical Queries,CopyQuery, ejjuit, query, copyquery, copyquery.com, android doubt, ios question, sql query, sqlite quer
Hi Pranad,
Thanks for detailed explanation on each customer implementations. I recently starting learning Big Data concepts and having confusion on shuffle/sort and partinioner concepts. If you get a chance could you please explain me what exactly shuffle does?
Venkata
It’s hard to explain in few sentences. This page from yahoo Hadoop tutorial should help
http://developer.yahoo.com/hadoop/tutorial/module4.html
Pranab
Thanks Pranab.
I gone through above example and implemented in a Brute force method since I want to get the output as on which day and zone ,maximum number of users logged.That is
Input
Zone Day users
1 01-15-2014 5
1 01-16-2014 `10
2 01-18-2014 25
2 01-19-2014 9
Reducer Output:
Zone Day users
1 01-16-2014 10
2 01-18-2014 25.
I am able to get the maximum number of user for each day and zone not a specific day and zone.
How can I control reducers keys in reduce method?
Thanks
Srujana.
i have learn that the your reviews.thanks a lot.
Hadoop Training in Chennai
Thanks
vignesh
Pingback: Diversity in Personalization with Attribute Diffusion | Mawazo
Apache Hadoop® is an open source framework for distributed storage and processing of large sets of data on commodity hardware. Hadoop enables businesses to quickly gain insight from massive amounts of structured and unstructured data.
Start learning Apache Hadoop® from basics to advance levels here…
https://goo.gl/r7g0Dt