Personalized recommendations are ubiquitous in social network and shopping sites these days. How do they do it? Al long as enough user interaction data is available for items e.g., products in shopping sites, a kind of recommendation engine based on what’s know as Collaborative Filtering is not that difficult to build.

Full implementation of my Hadoop based recommendation engine *sifarish* is hosted on github.

## My Approach

I will follow a technique called Item Based Collaborative Filtering. The basic idea is simple and it involves two main steps.

First, based on an user’s interest level on products, we find correlation between similar products. Second, based on targets user’s current set of items of interest and using the results of the first step, we recommend a ranked list of new items.

Let’s a take small digression into algorithm complexity and big O notation. Since we are interested in finding correlation between pairs of items, the complexity is O(n x n). If a shopping site has 500,000 products, potentially we may have to in the order of 250 billion computations. Granted the correlation data will be sparse, because it’s unlikely that every pair of items will have some user interested in them. Still it’s challenging to process this amount of data. Since the user interest in products changes with time, the correlation result has a temporal aspect to it. The correlation calculation needs to done periodically so that the results are up to date. Since correlation calculation lends itself well to divide and conquer pattern, we will use Hadoop.

In this post I will focus on the correlation calculation between pairs of items. In a follow up post, I will go over how to use the correlation data to make the recommendations.

## Product Rating

What we want to correlate is the rating for different products i.e. we are going to use rating to find similarities between products. If products are explicitly rated by user e.g., in a scale of 1-5, we can use that number directly. Let’ s assume we don’t have any such rating data. The site may not offer any product rating feature. Even if the feature is available, visitors may simply ignore it.

Instead we take a more intuitive pragmatic approach by monitoring user behavior.

We will come up with a product interest level by tracking how many times a product detail page has been viewed by an user, whether the product was put on the shopping cart and lastly and most importantly whether the user bought the product, which represents increasing level of interest in the same order.

The following table summarizes the rating logic

Product Rating | |

1-2 product view | 1 |

3-5 product view | 2 |

More than 5 product view | 3 |

Product in shopping cart | 4 |

Product bought | 5 |

Depending on the nature of an user’s interaction with a product, it will be rated in the scale of 1-5, as per the table above. We could make this more sophisticated by taking into account parameters like amount of time spent on product page, how recent the user behavior data etc.

## Rating Correlation

We will calculate rating correlation between products using Pearson Correlation coefficient as follows. This is not the only way to find similarities. But Pearson correlation coefficient is one of the popular ones.

c(i,j) = cov(i,j) / (stddev(i) * stddev(j)) cov(i,j) = sum ((r(u,i) - av(r(i)) * (r(u,j) - av(r(j))) / n stddev(i) = sqrt(sum((r(u,i) - av(r(i)) ** 2) / n) stddev(j) = sqrt(sum((r(u,j) - av(r(j)) ** 2) / n) The covariance can also be expressed in this alternative form, which we will be using cov(i,j) = sum(r(u,i) * r(u,j)) / n - av(r(i)) * av(r(j) where c(i,j) = Pearson correlation coefficient between product i and j cov(i,j) = Covariance of rating for products i and j stddev(i) = Std deviation of rating for product i stddev(j) = Std deviation of rating for product j r(u,i) = Rating for user u for product i av(r(i)) = Average rating for product i over all users that rated sum = Sum over all users n = Num of data points

The numerator cov(i,j) indicates the correlation between between the rating of two products by an user. When both product ratings are above average, the product of two ratings is positive. When both product ratings are below average, the product of two ratings is also positive.

The denominator is the product of std deviation of ratings for products i and j. It’s used to normalize the correlation coefficient, which always lies between 1 and -1. It takes on the value of 1 if the two products i and j are rated indetically by all users. A positive correlation is indicative high rating for i to be associated with high rating for j and low rating for i to be associated with low rating for j

If there are p number of products, then the correlation coefficient can be thought of as a p x p matrix. The matrix will is likely to be sparse, because many of the cells in the matrix may not have any value.

## Hadoop Processing

Essentially we have to generate the product of ratings for every of pair of products rated by some user.

We have two kinds of input. The first one contains the mean and std deviation of ratings for all the products as shown below. For reasons explained later, this input needs to be in the format of one row for each product Id pair followed by the mean and std deviation for rating for each product in the pair

pid1 | pid2 | m1 | s1 | m2 | s2 |

xxx | xxx | xxx | xxx | xxx | xxx |

Here *pid* is the product Id, *m* is mean rating and *s* is std deviation of rating. When a row of this type is input is processed by the mapper, it will emit the *pid* pair as key and the rest as the value.

I am taking a digression to explore how we can generate such data. We can use map reduce again to generate such data. Given a list of product Id and associated mean and std dev of rating, how can we generate such pair wise list. There are too many such combinations. How can we reduce the scale of the problem.

The key is to think in terms of divide and conquer. If we use digits and lower and upper case alphabets for product, and then group the data by unique combinations of the first character of the product Id then we divide up the problem.

There will be approximately 1800 such groups and each group will only have those product Ids that start with the corresponding characters. For example, the group keyed by “a3” will only have product Id’s that start with either “a” or “3”.

Within each group it’s easier create the unique product Id pairs, because we are dealing with smaller sets of data. Finally, we combine the results for the individual groups to get final list with will contain all unique product Id pairs.

The other input contains rating for all users, with one row per user. It will contain user Id followed multiple pairs of product Id and rating

uid | pid | rating | pid | rating | ||

uid | pid | rating | pid | rating | pid | rating |

uid | pid | rating | pid | rating |

Where *uid* is the user id and *pid* is the product id. Each row will have a variable number of product Id and rating pairs depending on how many products have rating data for a given user. When a row of this input is processed by the mapper, it emits multiple keys and values. All possible pairs of pid are enumerated and each pair is emitted as a key. The corresponding product of the rating is emitted as the value.

For any *pid* pair, the grouping needs to be done in such a way that the first value in the list of values in the reducer input will be the the two mean and std deviation for the the two products. Subsequent values are the product of ratings for the two products.

## Mapper

The mapper implementation is as follows. The key has three tokens, 2 *pid* followed by 0 or 1. We will be using custom group comparator with 0 or 1 appended to the key, so that for any given pid pair, the value containing the mean and std dev will appear before all the values containing the rating products in the reducer input. That is the reason for appending “0” and “1” to the key.

Here is some example mapper output. The first row shows an example of mean and std dev for a *pid* pair. The next two rows are examples of product of rating for *pid* pairs. The key consists of 2 *pid* followed by 0 or 1, depending on the record type.

Mapper output | |

Key | Value |

agtr63t5,gh7e5ja6,0 | 2.7,.09,3.1,.06 |

h5s8fu36,hr67d03k,1 | 15 |

y836djf8,3j78du3i,1 | 6 |

However for grouping purpose only the first two tokens will be used so that all the data for for a given *pid* pair will be fed to the reducer in one call.

## Reducer

Here is an example reducer input. The first element in the list of values is the mean and std dev of rating for the *pid* pair in the key. Following values in the list are the product of rating corresponding to the *pid* pair in the key.

Reducer input | |

Key | Value List |

audr63t5,5h78dja6 | [2.8, .09, 2.1, .30][6][15][20][9] |

Every call to the reducer will create output text, which will consist of 3 coma separated tokens which are pid1,pid2 and correlation

## Partitioning

Unless we implement a partitioner, there is not guarantee that all the data for the same product Id pair will go to the same reducer in the Hadoop cluster.

We need to write a partitioner based on the first 2 tokens of the key i.e., 2 *pid* values. Here is the implementation

public int getPartition(Text key, Text value, int numReducer) { //returns string containing coma separated pid values and excluding //the last token which is 0 or 1 Text pidPair = getPidPair(key); return pidPair.hashCode() % numReducer; }

The return value will determine which among the *numReducer* number of reducers will process the data for a pair of *pid*.

## Group Comparator

We need to ensure that all the data for a given *pid* pair get fed into the reducer in one call. To be able to do that we need to take control away fro Hadoop’s default group partitioning into our own hands.

Just like the partitioner, the group comparator will also be based on the first two tokens of the key as shown below

public int compare(WritableComparable w1, WritableComparable w2){ Text pidPair1 = getPidPair((Text)w1); Text pidPair2 = getPidPair((Text)w2); return pidPair1.compare(pidPair2); }

Our custom group comparator will group data for each *pid* pair before feeding to the reducer.

## Final Thoughts

We are most of our way through our recommendation engine. A this point in our hand we have several reducer output files where each row contains 2 *pid* values and the corresponding correlation coefficient for all possible product pairs for rating data is available.

Armed with the correlation values and the ratings for a target user, we can make product recommendations for the user. But that’s another map reduce job. I will save that for another day and another post.

## Further Reading

If you want to learn more about Recommendation Engine and Collaborative Filtering, here is a good place to start.

For those interested in Hadoop and Map Reduce in, here is an excellent tutorial in Yahoo Developer network.

## Update

For the implementation, please refer to this post for the map reduce workflow for the complete solution and this one for Pearson correlation map reduce. The final implementation differs from the solution proposed here, because I had to accommodate multiple correlation algorithms, of which Pearson correlation is one.

Please refer to this tutorial for the complete work flow involving all the MR jobs. Some of the MR jobs in the work flow are optional.

Pingback: Recommendation Engine Powered by Hadoop (Part 2) | Mawazo

First of all thanks for very helpful and informative article.

I was going through it and seen when you have written about deriving first input for final map reduce that is .

“The key is to think in terms of divide and conquer. If we use digits and lower and upper case alphabets for product, and then group the data by unique combinations of the first character of the product Id then we divide up the problem. There will be approximately 1800 such groups and each group will only have those product Ids that start with the corresponding characters. For example, the group keyed by “a3″ will only have product Id’s that start with either “a” or “3″.

Within each group it’s easier create the unique product Id pairs, because we are dealing with smaller sets of data. Finally, we combine the results for the individual groups to get final list with will contain all unique product Id pairs.”

I didnt understand fully the logic of reachign to unique combinations of products using MR jobs.

Are you considering first 2 chars for first run to get the unique comb. for that group? I can understand that but when we get multiple groups and create unique combinations of prods and write on file ( from reducer), and that how you have merged these files having cobmination of product ids.

Plus how you reach nu, 10080 for 26(small alpha) + 26 (Capital) + 10 (numbers)….

Please explain as Im trying to write some algo’s for Reccommendation engine.

So, will be heplful to understand this as well.

Once again thanks for your article.

Regards,

Manish

Sorry the number you mention is 1800 and not 10080. I think it should be calculated as a combination problem ….. where we have 56 (alphanumeric) choices and we need to pick unique combination of 2 char from it.

So, n=56, r =2 , nCr=n!/r!(n-r)!=56!/2!*54!=56*55/2=1540 combination.

Is it correct, plus does that mean we need to call the MR job to get unique products combination these many times as each time MAPPER will use 2 chars to define the key and give input to reduce, and reduce create unique combination of each group.

If this is the case we need to tell Map job dynamically that what would be the key (a3= a or 3, 4b = 4 or b) when we run using JobConf.

Regards,

Manish

Did further reading and found that Configuration/JobConf can be used to pass param to the job and in mapper it can be access from method setup(Mapper.Context ctx) … the param is JobCtx and getConfiguration() can be used to get conf.

In Configuration.get(key) and set(key,val) can be used.

Regards,

Manish

Manish

Thanks for reading my post and asking the questions. I kind of glossed over the MR job to output product pair data.

The input is pid, mean , stddev for this MR job. Our goal is to generate all possible pairwise combination. Using the first char of pid, there are 1540 combinations. You are right. I was doing a quick estimate. It’s basically 56 * 55 / 2.

The setup method of mapper could generate these possible 1540 keys and store them as a class member variable. When a new row is processed, all the mapper will have to do is to find all the keys among those 1540, that contains the first char of the pid field of the current row. There will be 55 such keys. Because, a given char will be combined with 55 other chars. For each key, the mapper will emit the corresponding key and the row as the value. Essentially we are doing a grouping operation.

On the reducer side, there will be 1540 keys and the reducer will be invoked for each key. The values for a key will be the all records whose first character of the pid is part of the key.

If there are 150000 products, each group will have on an average 100 values. The reducer could generate pair wise records for the 100 values. It’s basically a nested loop through all the values.

Hope it clarifies. If not, please feel free to reply.

Pranab

Manish

My original rough estimate of 1800 for pair wise combination of first char of product id is correct. There are 62 (26+26+10) alphanumeric characters. To be precise, there will be 62 * 61 / 2 = 1891 combinations.

The mapper task will email emit 61 key/value for each record it processes. Because the first char of the product id for the current record, will combine with 61 other characters.

Pranab

Thanks Pranab,

Its my fault of mentioning 56 chars ( dont know from where I calculated :) ).

Have you tried Mahout implementation instead of using your own PC implementation over hadoop.

Plus as you mentioned we can calculate 1800 odd combination in setup method of mapper. Instead of that would it be better to have some predefined config file to have these kind of details.

I can see one point as every mapper is a separate process that doesnt make much difference to keep something in config file or in setup method.

The adv, of config file could be runtime binding. But its not at all major in this solution and just a though came.

Thanks for nice post and reply.

Regards,

Manish

Pingback: Recommendation Engine Powered By Hadoop « Another Word For It

Pingback: Similarity Based Recommendation – The Basics | Mawazo

Pingback: Warm Starting a Recommender with Hadoop | Mawazo

Spelling of “indetically” should be “identically”.

Pingback: From Item Correlation to Rating Prediction | Mawazo

Pingback: Get Social With Pearson Correlation | Mawazo

Pingback: Get Social with Pearson Correlation | Big Data Cloud

The Information which you provided is very much useful for Hadoop Online Training Learners Thank You for Sharing Valuable Information

Hi,

I read your post and found it interesting. Have a doubt what even after dividing we get in one division or bucket many element? How to solve this problem? Or how can we divide it further?

Amit

I am not sure I understand your question. But let me clarify how the hash join with bucketing works. The mapper generates every possible bucket pair. A reducer gets a subset of those bucket pairs. A reducer call to reduce() gets a pair of buckets. In reduce(), each member of one bucket is paired with all members of the other bucket.

Reblogged this on pavanmuniraju and commented:

Good Article about Recommendation engine

Pingback: Programming Languages for Recommendation Engines » Multipurpose Wordpress theme

Hi Pranab. Can I implement this with Parse.com?

Sure. You can get free technical support from me for your POC through email and github issue tracker. Please provide some details of your project.

Thanks Pranab!

Your article and source code are very helpful for my work. And your project sifarish is excellent.

I meet some difficuties when reading the src ‘org.sifarish.social.PearsonCorrelator’. I have read your design of divide and conquer in the post which is very helpful. But I didnt understand fully how it works in the src(there is very few annotations). mainly from the map process in MR.

what does the variable ‘hashPairMult’ mean ? and are there some references about ‘Self join by hashing’.

Thank you for sharing the excellent project.

Franq

The parameter hashPairMult is used to map 2 hash values to 1. basically it’s used to pair up all possible hash buckets with each other.

Here is a link for hash join

https://en.wikipedia.org/wiki/Hash_join