When we hear about trending, twitter trending immediately comes to mind. However, there are many other scenarios, where such analysis is applicable. Some example use cases are 1. *Top 5 videos watched in last 2 hours* 2. *Top 10 news stories browsed in last 15 minutes* 3. *Top 10 products that users have interacted with in last 12 hours*. 4. *Count of some reading from a patient monitoring wearable exceeding some threshold more than 5 times in last 10 minutes*. These problems are also known as *heavy hitters problem.*

These problems have three characteristics. First, we want the answer real time as soon as the data is available. Second, the answer does need to be exact. Since we are interested in the ranking of top n items in terms of the popularity, approximate answer is acceptable as long as the error is small and within some bound. Third, since the computation is done in memory, the memory requirement for the algorithms should be within reasonable bounds.

Fortunately such algorithms are available and they are collectively known as *probabilistic aggregation algorithms*. I have implemented some of the popular ones in my OSS project *hoidla* in github. With the evolution of Internet of Things (IoT) and deluge of sensor data, we are going to see increasing usage of these algorithms.

In this post, we will use one of the algorithms called *count min sketches* to find the trending products in an eCommerce site, which is the 3rd use case listed above. The storm based implementation is part of my OSS recommendation and personalization engine *sifarish* on github. Showing trending products is one of of the ways to address the cold start problem.

## Approximate Aggregate Query

There is a wide range problems that come under this umbrella term. Frequent item or top hitter, which is the topic of this post is one of these problems. They are all aggregate queries, involving some kind of counting. They are approximate and typically based on some kind of probabilistic data structure. Here is the list

*Count of unique items e.g., count of unique web pages visited in a day**Whether an item belongs to set e,g, has this video been watched today**Histogram and percentile e.g., web session length below 75% qualtile**Top n frequent items e.g. top 5 most visited content in last 1 hour**Aggregate query**Range query*

These algorithms are most useful when the domain range of the variable under consideration is very large. For small domain range the problem is deterministic and the solution is trivial.

For example, if there are only several thousand products in an eCommerce site and you want to track time spent by users browsing them, you could construct a histogram directly from the raw data. Although if your only interested in items with high frequency count, it’s wasteful to track a complete histogram.

Even with small domain size you may want to use the approximate algorithms if you want to sacrifice accuracy for faster query response and limited resource usage..

## Synopsis Based Solution

A synopsis is data structure that captures the essence or summary of data seen so far from a real time stream. There are two basic operations on the data structure

*Update when a new tuple arrives in the stream**Query to get some results*

The size of the data structure is independent of the size of the stream. However, the size of the data structure and hence the resource demand in many cases is determined by an user specified maximum error bound on the query result.

These are various synopsis data structures. A particular data structure and it’s associated algorithms tend to work best for some specific problem among the problems listed earlier.There is no solution that works equally well for all problems.

Sampling | Uses the same representation as the raw data e.g., random sampling, reservoir base sampling |

Histogram | Count based representation e.g. equi depth, v-optimal |

Sketches | Summarizes frequency distribution with multiple hash functions e.g. count sketches, count min sketches |

Wavelet | Transforms data into a set of wavelet coefficients representing different levels of granularity e.g., Haar wavelet function |

An efficient synopsis data structure and it’s associated algorithm is characterized by the following qualities.

*Completely memory resident with low memory footprint**Low update time**Low query time*

Low memory requirement usually does not pose a problem. However, for high speed streams, minimizing update time and query time is crucially important.

To put this issue in proper perspective, imagine a plant monitoring system with hundreds of sensors generating readings at high speed. There may be very stringent SLA requirements for quickly triggering alarm when when the count of high reading values within a time window goes above some threshold.

## Heavy Hitters

Since our focus is on the heavy hitters problem, we will take a more in depth look at some the popular top frequency count algorithms. These are all implemented in *hoidla*.

*Manku and Motwani Lossy Counting*: It’s deterministic counting based algorithm. As count for an existing item is increased, items for which counts drop below some threshold are dropped.*Misra and Gries Frequent Items*: This is also a deterministic counting based algorithm. Fixed number of item buckets are maintained. As new items arrive, if a bucket exists for the item, the count is incremented. If all buckets are taken, count is decremented for all buckets.-
*Count Sketch*: A family of hash functions is maintained. As new items arrive, for each hash function, the counter in the corresponding hash bucket is incremented or decremented. *Count Min Sketch*: A family of hash functions is maintained. As new items arrive, for each hash function, the counter in the corresponding hash bucket is incremented.

Although this post is about probabilistic aggregation algorithms, I have include the first two algorithms, although they are not probabilistic. With the last two algorithms, you can also get frequency count for any item in addition to the heavy hitters.

## Count Min Sketch

We will take a detailed look at this algorithm, since this is the one I have used in implementing most frequently interacted product use case. The basic input to the algorithm is the maximum allowable error (ε) for item frequency and a upper bound on the probability (δ) of exceeding that error. The algorithm works as follows

**We maintain an matrix of (d x w) hash buckets**, where d is the number of hash functions and and w is the range of hash values. The width w is determined by ε and the depth d is determined by δ Each bucket contains a counter.**As new item arrives**, we apply each hash function and increment the corresponding hash bucket counter. We increment d counters.**To find the frequency count for an item**, we apply the d hash functions and read the counter values from the corresponding buckets. We end up with d counter values. We take the minimum of the counter values and return that as the estimated count.

There is an intuition behind taking the minimum of the counter values. The count for a given item for any of the hash functions will generally be overestimated because of hash collision, although the chances of collision go down as the skewness of data increases. By taking the minimum of all the counters, we minimize the overestimation and hence the error.

The number of most frequent items to be tracked is defined by the configuration parameter *sketches.most.freq.count*. Only those items whose frequency count is above a percentage of the total count is tracked. The percentage is set by the configuration parameter *sketches.freq.count.lim.percent.*

## Input Stream

The input stream consists of user and item engagement data as described in my earlier post. A tuple in the stream consists of (*userID, sessionID, itemID, event*). This data is derived from raw click stream data.

## Storm Topology

The storm topology consist of the following three processing layers. A *redis* queue is used to feed input stream to Storm. Output is written to another redis queue

**A redis spout**to ingest the input stream. The spout is connected to the next bolt layer through shuffle grouping**A bolt layer**consisting of one or more bolts implementing count min sketch. Because of shuffle grouping the sketch processing parallelized. Each bolt instance processes a sub set of the data and maintains a sketch built with data subset. When a message tuple is received, the sketch is updated. This bolt is enabled with tick tuple, so that it receives a tick tuple from storm at regular interval. When the tick tuple is received, it gets the top n frequent items from the sketch and send them to the down stream bolt. It’s connected to the next bolt layer through shuffle grouping. Actually the grouping does not matter since there is only one bolt in the next layer.

**A bolt layer**with one bolt performing synchronization and aggregation operation. When it receives all frequent items from all the upstream bolt instances, it does a merge sort and finds the final set of top n frequent items. It writes them to another redis queue. Alternatively, the upstream bolts could send their sketch also. Since sketches are additive, the bolt could combine all the sketches to get the final one.

When a message is received, counters in *d* hash buckets are incremented for that item. As described earlier, the last field of the message tuple is an event type indicating the level of of interest an user has for the item. More details on this can be found in this earlier post. As an improvement, instead of incrementing the counter, we could have increased the counter with a value proportional to user’s interest level.

## Time Sensitive Count

The sketch as described so far is built based on all the data observed from a stream. However, in real time analytic we are generally interested in recent data over a predefined time window.

One approach could be to store a list of time stamps instead of a simple counter in each of of the d x w hash buckets. As the window slides with passing time, we could discard the older time stamps. With this approach, the memory requirement becomes O(n) instead of O(d x w), which could be significantly larger.

## Epoch Based Window

I have adopted an epoch based approach with less memory foot print for managing time window. An epoch covers covers a finite span of time and the window consists multiple epochs. We keep track of counts per epoch. Each of the d x w hash buckets contain list of epochs.

As the window advances with time, we create a new epoch, add it to the list and drop the oldest epoch from the window.Memory requirement becomes O(d x w x e) where e is the number of epochs in the window.

Creation and expiry of epochs is triggered by storm tick tuple. An epoch is defined as multiple of tick interval. For example, the tick interval could be 1 minute, epoch length 5 minutes and window length 30 minutes. With this setting, for every 5 ticks, a new epoch is created and the oldest one expired. The window will consist of 6 epoch.

## Tumbling Window

With epoch based time window, the window slides with marching time. An window is always overlapping with some of it’s predecessor windows. One advantage is that the changing data stream is handled in a smooth way.

Another option is to slide the window in such way that there is no overlap with the previous window. It’s called tumbling. The choice is made through the configuration parameter *sketches.expiry.policy*. The window is aligned with the wall clock. For example, the window length could be 24 hours and it could tumble to a new window at midnight. After a new window starts, since there is no data in the window, the query result will not be very reliable until significant amount of stream data is processed.

## Output

As i mentioned before, frequency count for the heavy hitters item is written to a redis queue by the second layer of bolts. Here is some sample output. Each line consists of set of itemIDs, which each itemID followed by the count.

RK2958CQ:4:3K3UE2CC:1:D2V62ZV9:1 RK2958CQ:5:D2V62ZV9:3:3K3UE2CC:2 RK2958CQ:5:D2V62ZV9:3:3K3UE2CC:2 RK2958CQ:5:D2V62ZV9:3:3K3UE2CC:2 RK2958CQ:5:D2V62ZV9:3:3K3UE2CC:2 RK2958CQ:5:D2V62ZV9:3:3K3UE2CC:2 RK2958CQ:5:D2V62ZV9:3:3K3UE2CC:2 3K3UE2CC:17:RK2958CQ:17:D2V62ZV9:14 3K3UE2CC:17:RK2958CQ:17:D2V62ZV9:14 3K3UE2CC:17:RK2958CQ:17:D2V62ZV9:14 3K3UE2CC:17:RK2958CQ:17:D2V62ZV9:14 3K3UE2CC:24:RK2958CQ:18:D2V62ZV9:13 3K3UE2CC:24:RK2958CQ:18:D2V62ZV9:13 3K3UE2CC:24:RK2958CQ:18:D2V62ZV9:13

Each line has list of itemID, count pairs. We can as the frequency counts evolve as streaming data is processed.

## Final Thoughts

The algorithms described here have been an active area of research for many years, They started being used widely in telecom industry mostly for network traffic analytic from the nineties. With the evolution of Internet of Things (IoT), there will a resurgence of these types for algorithms for real time approximate analytic.

I have implemented many of them as plain java API in my OSS project hoidla. I have kept the algorithm implementation completely agnostic of the cluster framework, so that they can used from any platform whether *Storm* or *Spark Streaming* or some thing else.

One of the costly mistakes I had made in past for my machine learning projects, was to embed the algorithm implementation deep inside the *Hadoop* map reduce, rendering them unusable from any other cluster computation framework. My policy now is to completely decouple the algorithm implementation from any cluster computation framework. Here is the tutorial with instructions on how to run the example.

Pingback: The world beyond batch: Streaming 101 - O'Reilly Radar

Pingback: The world beyond batch: Streaming 101 | Geekgets

Pingback: TECNOLOGÍA » The world beyond batch: Streaming 101

Pingback: 超越批处理的世界：流计算101