Making Recommendations in Real Time

Making recommendations based on an user’s current behavior in a small time window is a powerful feature that has been added to sifarish recently. In this post I will go over the details of this feature. The real time feature has been added for social collaborative filtering based recommendations.

In our solution, although Storm is used for processing real time user engagement event stream to find recommended items, Hadoop does lot of heavy lifting by computing the item correlation matrix from historical  user engagement event  data.  Redis has been used as glue to connect various Big Data sub systems.

Hadoop Batch Processing for  Item Correlation Matrix

Since the focus of this post is real time processing, I won’t be providing lot of details regarding the Hadoop jobs used to compute the item correlation matrix. Such details are available from my earlier posts. We start with user engagement event stream data which consists of of the following fields

  1. userID
  2. sessionID
  3. itemID
  4. eventType

Some pre-processing is necessary on raw click stream data to generate  this kind event stream data. More details on estimating rating from events are available in my earlier post. This input is fed into a sequence of  Hadoop Map reduce jobs to get the final output as an item correlation matrix. The details of this Map Reduce workflow is as below

Input Map Reduce Output
User engagement event ImplicitRatingEstimator which converts event stream to implicit rating Estimated rating as (userID,itemID,rating,timestamp)
Input from previous job CompactRatingFormatter which converts estimated rating to a more compact form Rating data with one line per item, consisting of itemID followed by a list of (userID,rating) pairs
Input from previous job ItemDynamicAttributeSimilarity which calculates items correlation Item correlation as (itemID1,itemID2,correlation,corrIntersectLength)
Input from previous job CorrelationMatrixBuilder which transforms correlation data to a matrix form Item correlation as items ID followed by a list of (itemID,rating) pairs

Details about ImplicitRatingEstimator  and ItemDynamicAttributeSimilarity Map reduce jobs can be found in my earlier  posts. If pearson correlation is to be used instead of cosine or jaccard, then the PearsonCorrelator Map Reduce should be used, instead of ItemDynamicAttributeSimilarity.

The final correlation matrix is stored in Redis cache for later consumption by Storm. For each row of the item correlation matrix, the itemID is used as the cache key and the list of correlated items along with correlation coefficients constitutes the value.

One issue that should be kept in mind is that the item correlation matrix needs to be re-computed periodically, because of changing customer population, item inventory and user behavior. In other words, there is a drift in the model.

The frequency of the re-computation and the time window length of the historical data should be dictated by how dynamic the environment is. Here is an example:  compute once a week using past 6 months data..

Storm Real Time Processing for Recommendations

Item correlation based collaborative filtering recommendation processing requires two kinds of input

  1. Set of items rated by user and the corresponding rating
  2. Item correlation matrix

As far as the second item is concerned, as described earlier, items correlation matrix is computed by a sequence of Hadoop Map reduce jobs using historical user engagement event data.

For the first item, since we are not using explicit user rating, Storm will convert real time user engagement event stream to estimated ratings for all items that an user is engaging with. For rating estimation, Storm needs the event to rating mapping metadata.

The metadata JSON file is loaded into a Redis cache prior to starting storm. During the  initialization of a Storm bolt, the JSON string is read from the cache and de-serialized to a java object.

The Storm topology is simple, consisting of a Spout for Redis queue and a Bolt for the processing. The parallelism for the Spout and Bolt is provided through configuration parameters. The event tuples are partitioned (field grouped as per Storm terminology) by the userID, so that events for an user will always be processed by the same Bolt instance.

Our bolt does state full processing. An user engages with may items during a session. The bolt maintain an in memory queue of events for each userID,itemID pair. The event window has different user configurable expiration policies as below.

  1. Session : Whenever a new session is encountered, the queue is emptied
  2. Time window : Only events within a specified time window are kept. Older events are dropped
  3. Event count : Only a specified number of recent  events are kept.

The size of the window at any given time determines to what extent recent events influence the recommendations. A short window implies that only very recent data determines the recommendation output.

The caching of item correlation vectors is in 2 levels. The first level is based on Google Guava library cache. When there is a cache miss on the first level, Redis cache is looked up.

The recommendation output can be written to a Redis queue or cache depending on how it’s configured. Generally for every event for an user a new set of recommendations are generated. If the client application does not care about capturing the recommendations for all events, cache could be used. With Redis cache, the userID is used as the key and the cache will return the most recent recommendations generated for an user.

Real Time Processing Steps

Essentially, given an user’s interest in certain items as reflected by  an estimated rating values,we compute the predicted rating of certain other related items, making use of the item correlation matrix.

When operating in full batch modes, these computations are done through a set of Hadoop Map Reduce jobs. These batch computation are done periodically. During that window of days or weeks, recommended items for all users remain static, until next round of processing.

In contrast, in real time processing, for every event tuple received from the Redis queue the following processing steps are triggered. For every such event, the processing pipeline generates  an output of  userID followed by a list itemIDs for top k recommended items.

1 Redis spout forwards event tuple to appropriate bolt instance based on userID
2 Bolt adds event to in memory queue based on userID and itemID. Based on expiry policy, older events are dropped from the queues of all items for this user
3 For each item event queue, all events are scanned to pick the event with the highest user intent along with the count for such events. The count and event type are used to estimate rating for the item
4 For each item, the item correlation vector is looked from the cache. For each item, using the estimated rating and the items correlation vector, rating for correlated items are calculated
5 For all items, the calculated correlated item rating vectors are aggregated and average is taken
6 The aggregated and averaged item rating vector is sorted by descending order of rating and the top k items are picked
7 The top k items for an user is written to a Redis queue or cache depending on how it’s configured. If written to cache, userID is used as the cache key

The rating estimation logic for recommended items is same as in batch mode recommendation processing as described in an earlier post.

 Sample Input and Output

Here is some event data from the real time stream, consisting of userID, sessionID, itemID and event.

IRJKZ9226AHZ, 506bb5e6-c264-11e3-9c30-1c659d492701, CY7TJDN8O8, 2
R29PBQ1757I1, 506bc482-c264-11e3-b647-1c659d492701, A501JBCS5D, 3
R29PBQ1757I1, 506bc482-c264-11e3-b647-1c659d492701, M1BSEJZVQL, 0

Here is some sample recommendation output, consisting of userID followed by list of top 5 recommended items along with the score.


 What About Storm Limitations

Storm has some limitations. Let’s find out how adversely it affects us. Storm does not provide any support for state full processing. It leaves state management to the application logic. In our case, the states consist of the following items

  1. In memory event queue
  2. Cached item correlation row vectors
  3. Cached event to rating mapping meta data

With node failure, we will loose the in memory event queues, the consequence being that when a new  bolt instance instance is started as part of failure recovery, recommendations for those users that the failed Bolt instance was responsible for will be poor, since the in memory queues will not be fully populated. Eventually it will be stabilize. This kind of recovery is also known as convergent rollback recovery.

The correlation row vectors will be reloaded from Redis as needed. Even to rating metadata will also be reloaded from the cache when the new bolt instance initializes.

Since Storm’s recovery is based on upstream backup, it provides at least once message processing semantics and an event may be processed multiple times. But this won’t have any significant impact on the accuracy of recommendation output.

Wrapping Up

There are various Big Data sub systems in play in our solution and there are number of steps involved to run the example. The steps are detailed  in this tutorial document. The tutorial document refers to a shell script through which all the steps can be executed.

Real time recommendation processing does not yet support all the features available for batch mode recommendation processing e.g., infusing business goal in recommendation. I will be working on supporting all the features available in batch mode in real time recommendation processing.


About these ads

About Pranab

I am Pranab Ghosh, a software professional in the San Francisco Bay area. I manipulate bits and bytes for the good of living beings and the planet. I have worked with myriad of technologies and platforms in various business domains for early stage startups, large corporations and anything in between. I am an active blogger and open source contributor. I am passionate about technology and green and sustainable living. My technical interest areas are Big Data, Distributed Processing, NOSQL databases, Data Mining and Programming languages. I am fascinated by problems that don't have neat closed form solution.
This entry was posted in Big Data, Collaborative Filtering, Data Mining, Hadoop and Map Reduce, Real Time Processing, Recommendation Engine, Redis, Storm and tagged , . Bookmark the permalink.

2 Responses to Making Recommendations in Real Time

  1. pranay says:

    Hi Pranab
    Excellent is what i can describe your knowledge about the data mining and big data ecosystems.I read your blogs very often and they have really helped me in my understanding about the whole concepts.I am working as a big data consultant recently and want to develop a predictive algorithm which can help websites send emails or other stuff to their customers based on the customer preferences,browsing history,clicks and other stuff they do on the website.I want to do a POC first on my own laptop.Can you please help me understand how i should go about it?I mean i have a user database with me which are all the registered users on my website,Can you suggest with how do i architect from this point to giving them real time solution.


  2. Pranab says:


    I am glad you like it. If you want to send periodic recommendations through email to customers, based on customer behavior, you could run the the complete batch mode workflow involving multiple MR jobs to process click stream all the way to generate recommendations for customers.

    This post describes the workflow. You will also find it towards the end of the current post

    This tutorial document provides the details of the steps involved

Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s