Making recommendations based on an user’s current behavior in a small time window is a powerful feature that has been added to sifarish recently. In this post I will go over the details of this feature. The real time feature has been added for social collaborative filtering based recommendations.
In our solution, although Storm is used for processing real time user engagement event stream to find recommended items, Hadoop does lot of heavy lifting by computing the item correlation matrix from historical user engagement event data. Redis has been used as glue to connect various Big Data sub systems.
Hadoop Batch Processing for Item Correlation Matrix
Since the focus of this post is real time processing, I won’t be providing lot of details regarding the Hadoop jobs used to compute the item correlation matrix. Such details are available from my earlier posts. We start with user engagement event stream data which consists of of the following fields
Some pre-processing is necessary on raw click stream data to generate this kind event stream data. More details on estimating rating from events are available in my earlier post. This input is fed into a sequence of Hadoop Map reduce jobs to get the final output as an item correlation matrix. The details of this Map Reduce workflow is as below
|User engagement event||ImplicitRatingEstimator which converts event stream to implicit rating||Estimated rating as (userID,itemID,rating,timestamp)|
|Input from previous job||CompactRatingFormatter which converts estimated rating to a more compact form||Rating data with one line per item, consisting of itemID followed by a list of (userID,rating) pairs|
|Input from previous job||ItemDynamicAttributeSimilarity which calculates items correlation||Item correlation as (itemID1,itemID2,correlation,corrIntersectLength)|
|Input from previous job||CorrelationMatrixBuilder which transforms correlation data to a matrix form||Item correlation as items ID followed by a list of (itemID,rating) pairs|
Details about ImplicitRatingEstimator and ItemDynamicAttributeSimilarity Map reduce jobs can be found in my earlier posts. If pearson correlation is to be used instead of cosine or jaccard, then the PearsonCorrelator Map Reduce should be used, instead of ItemDynamicAttributeSimilarity.
The final correlation matrix is stored in Redis cache for later consumption by Storm. For each row of the item correlation matrix, the itemID is used as the cache key and the list of correlated items along with correlation coefficients constitutes the value.
One issue that should be kept in mind is that the item correlation matrix needs to be re-computed periodically, because of changing customer population, item inventory and user behavior. In other words, there is a drift in the model.
The frequency of the re-computation and the time window length of the historical data should be dictated by how dynamic the environment is. Here is an example: compute once a week using past 6 months data..
Storm Real Time Processing for Recommendations
Item correlation based collaborative filtering recommendation processing requires two kinds of input
- Set of items rated by user and the corresponding rating
- Item correlation matrix
As far as the second item is concerned, as described earlier, items correlation matrix is computed by a sequence of Hadoop Map reduce jobs using historical user engagement event data.
For the first item, since we are not using explicit user rating, Storm will convert real time user engagement event stream to estimated ratings for all items that an user is engaging with. For rating estimation, Storm needs the event to rating mapping metadata.
The metadata JSON file is loaded into a Redis cache prior to starting storm. During the initialization of a Storm bolt, the JSON string is read from the cache and de-serialized to a java object.
The Storm topology is simple, consisting of a Spout for Redis queue and a Bolt for the processing. The parallelism for the Spout and Bolt is provided through configuration parameters. The event tuples are partitioned (field grouped as per Storm terminology) by the userID, so that events for an user will always be processed by the same Bolt instance.
Our bolt does state full processing. An user engages with may items during a session. The bolt maintain an in memory queue of events for each userID,itemID pair. The event window has different user configurable expiration policies as below.
- Session : Whenever a new session is encountered, the queue is emptied
- Time window : Only events within a specified time window are kept. Older events are dropped
- Event count : Only a specified number of recent events are kept.
The size of the window at any given time determines to what extent recent events influence the recommendations. A short window implies that only very recent data determines the recommendation output.
The caching of item correlation vectors is in 2 levels. The first level is based on Google Guava library cache. When there is a cache miss on the first level, Redis cache is looked up.
The recommendation output can be written to a Redis queue or cache depending on how it’s configured. Generally for every event for an user a new set of recommendations are generated. If the client application does not care about capturing the recommendations for all events, cache could be used. With Redis cache, the userID is used as the key and the cache will return the most recent recommendations generated for an user.
Real Time Processing Steps
Essentially, given an user’s interest in certain items as reflected by an estimated rating values,we compute the predicted rating of certain other related items, making use of the item correlation matrix.
When operating in full batch modes, these computations are done through a set of Hadoop Map Reduce jobs. These batch computation are done periodically. During that window of days or weeks, recommended items for all users remain static, until next round of processing.
In contrast, in real time processing, for every event tuple received from the Redis queue the following processing steps are triggered. For every such event, the processing pipeline generates an output of userID followed by a list itemIDs for top k recommended items.
|1||Redis spout forwards event tuple to appropriate bolt instance based on userID|
|2||Bolt adds event to in memory queue based on userID and itemID. Based on expiry policy, older events are dropped from the queues of all items for this user|
|3||For each item event queue, all events are scanned to pick the event with the highest user intent along with the count for such events. The count and event type are used to estimate rating for the item|
|4||For each item, the item correlation vector is looked from the cache. For each item, using the estimated rating and the items correlation vector, rating for correlated items are calculated|
|5||For all items, the calculated correlated item rating vectors are aggregated and average is taken|
|6||The aggregated and averaged item rating vector is sorted by descending order of rating and the top k items are picked|
|7||The top k items for an user is written to a Redis queue or cache depending on how it’s configured. If written to cache, userID is used as the cache key|
The rating estimation logic for recommended items is same as in batch mode recommendation processing as described in an earlier post.
Sample Input and Output
Here is some event data from the real time stream, consisting of userID, sessionID, itemID and event.
IRJKZ9226AHZ, 506bb5e6-c264-11e3-9c30-1c659d492701, CY7TJDN8O8, 2 R29PBQ1757I1, 506bc482-c264-11e3-b647-1c659d492701, A501JBCS5D, 3 R29PBQ1757I1, 506bc482-c264-11e3-b647-1c659d492701, M1BSEJZVQL, 0
Here is some sample recommendation output, consisting of userID followed by list of top 5 recommended items along with the score.
IBNZZA2ZN4EQ,FC3R48KUMX:19440,3EJ3IH3260:18920,K3VYYXRRZ7:15040,JXY0NCS22Y:11840,VSO7PV3GU4:3440 ZL1EDYT4RX0A,A3S51X0MXQ:3600,FKV3M5HUPK:3208,SV24KIASSU:2912,ISWIQDPZGT:2896,YO3XIIL7ES:472 IN2F94UKNLR3,I871XXOSYF:15986,S910V5YJF6:14200,CY7TJDN8O8:12695,1H092ZJIST:11720,Y2NBXWIC5S:9726 ZDJ44TJ21AYM,O9LKR00D2U:8280,2KYFI6J3CD:7820,48X0F01WR2:7100,YLT6QBJ433:7080,TNL7373JAE:1800 IN2F94UKNLR3,CY7TJDN8O8:16228,I871XXOSYF:15986,1H092ZJIST:14944,Y2NBXWIC5S:14690,S910V5YJF6:14200
What About Storm Limitations
Storm has some limitations. Let’s find out how adversely it affects us. Storm does not provide any support for state full processing. It leaves state management to the application logic. In our case, the states consist of the following items
- In memory event queue
- Cached item correlation row vectors
- Cached event to rating mapping meta data
With node failure, we will loose the in memory event queues, the consequence being that when a new bolt instance instance is started as part of failure recovery, recommendations for those users that the failed Bolt instance was responsible for will be poor, since the in memory queues will not be fully populated. Eventually it will be stabilize. This kind of recovery is also known as convergent rollback recovery.
The correlation row vectors will be reloaded from Redis as needed. Even to rating metadata will also be reloaded from the cache when the new bolt instance initializes.
Since Storm’s recovery is based on upstream backup, it provides at least once message processing semantics and an event may be processed multiple times. But this won’t have any significant impact on the accuracy of recommendation output.
There are various Big Data sub systems in play in our solution and there are number of steps involved to run the example. The steps are detailed in this tutorial document. The tutorial document refers to a shell script through which all the steps can be executed.
Real time recommendation processing does not yet support all the features available for batch mode recommendation processing e.g., infusing business goal in recommendation. I will be working on supporting all the features available in batch mode in real time recommendation processing.