Big Data Caught in Storm


Hadoop is great for batch processing. However depending on the  incoming data throughput and the cluster characteristic, there is a minimum latency threshold for processing data. My blog post is based on a simple performance model for Hadoop that allows you to find the minimum possible latency you can get for a data processing scenario depending on number of parameters.

What are our options, when we are processing big data and need near real time processing with very tight SLA requirements.

Essentially, we have have an incoming stream of high velocity data and we want to process a record as soon as it arrives.  It’s also know as stream processing or continuous computing.

We can not afford to save the data, create a batch when sufficient data has accumulated and then run Hadoop.

There are two promising products. One is Storm from twitter, which has been open sourced recently. Storm is used by twitter to process tweet stream. The other one is S4, which is originally from Yahoo and now an Apache project.

Predictive Analytic

I was driven to real time stream processing solutions for some of my predictive analytic projects e.g., fraud analytic. Typical predictive analytic or data mining classification solutions have two phases. In the first phase, a predictive model is built using what’s known as training data. Hadoop is an ideal fit for building the predictive model which involves processing massive amount of data in a scalable way.

In the second phase, we want to process a new record as soon as it arrives to make predictions. In case of a credit card transaction, we want to process a new transaction and predict whether or not it’s fraudulent transaction.

This is where the real stream processing solutions shine.  In our case, the system will take the incoming record, use the predictive model build in the first phase to make prediction for the new record.

Quick Tour of Storm

After reviewing Storm and S4, I decided to Storm for test drive. Here are some of the key concepts in Storm. In Storm, you have processing elements and data source elements. You can connect data sources and computation elements in any way you want to create a computation graph that solves the problem in hand.

Spout A data source supplying continuous stream of data
Bolt A computation element. The input and output can be connected to multiple spouts and bolts
Topology A connected graph of spouts and bolts. Defines a Storm job

A topology can have any number of spouts and bolts. A topology submitted to Storm keeps running, until it’s stopped explicitly.

Since I have some experience with Hadoop I found it easier to grasp Storm architecture by comparing side by side with the artifacts of Hadoop architecture. Here is the side by side comparioson

Storm Hadoop
Master node runs a daemon called Nimbus which orchestrates all jobs submitted to Storm Similar to Job Tracker in Hadoop
Worker nodes run a daemon called Supervisor which manages all Worker processes Similar to Task Tracker  in Hadoop
Workers are processes hosting Spouts and Bolts. Spouts and Bolts run as separate threads in the Worker process Somewhat similar to Task in Hadoop, except that in Storm Tasks run as threads
Multiple stages in processing pipeline Only two stages in processing pipeline; map and reduce
No tight integration with storage system Tightly coupled to HDFS
A topology does continuous processing A job has a finite life span

Although concecptually similar to Hadoop, there are significant differences. Although Storm is not tightly integrated with any storage system, there is nothing to stop you from accessing database from a Spout or Bolt.

Click Stream Sequence Mining

To try out Storm, I implemented a Storm topology to detect patterns of sequence of URLs accessed  in a given session. Such detection could be useful as an aid to entice visitors to convert. For example, after an user has gone through first or second page of a registration process, a chat popup could come up to offer help to the user with  with the registration process or the user could be offered discounts on the first purchase.

In my topology, I have Spout that sends log record to a Bolt implemented by the  UrlExtractorBolt class that extracts the cookie and the URL field from the log record. Then it emits the cookie and URL which gets consumed by another Bolt implemented by SequencePatternBolt which maintains a map of cookie and URL list.

Anytime the URL list gets updated it tries to match them with a predefined set of URL sequences. If a match is found it emits the cookie and the pattern name. Here is how the topology gets built and submitted to Storm. Including the spout, there are 3 stages in the processing pipeline here.


public static void main(String[] args) throws Exception {
 TopologyBuilder builder = new TopologyBuilder();

 builder.setSpout("logSpout", new LogSpout(), 5);

 //connect to spout  logSpout with shuffle grouping
 builder.setBolt("extractor", new UrlExtractorBolt(), 5)
 .shuffleGrouping("logSpout");

 //connect to bolt extractor with field grouping of field cookie
 builder.setBolt("sequence", new SequencePatternBolt(), 8)
 .fieldsGrouping("extractor", new Fields("cookie"));

 Config conf = new Config();
 conf.setDebug(true);

 if(args!=null && args.length > 0) {
 //submit topology
 conf.setNumWorkers(2);
 StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
 } else {
 //submit in local mode
 LocalCluster cluster = new LocalCluster();
 cluster.submitTopology("test", conf, builder.createTopology());
 Utils.sleep(10000);
 cluster.killTopology("test");
 cluster.shutdown();
 }
 }

The field grouping enables the tuples with the same cookie to be assigned to the same instance of the Bolt sequence. This is similar to the mapper output key hash partitioning in Hadoop. The last argument in setBolt() is the degree of parallelism. In this example,  the extractor Bolt executes in 5 threads and the the sequence Bolt executes in 8 threads.

Here is the nextTuple() method of logSpout  that emits the next tuple into the topology for processing. The data is read off a message queue.

public void nextTuple() {
 String record = readQueue();
 collector.emit(new Values(record));
 }

Here is the execute() method of the sequence bolt. Because of fieldGrouping, all data for the same cookie gets processed by  the same instance of the  sequence bolt.


public void execute(Tuple input) {
 String cookie = input.getString(0);
 String url = input.getString(1);

 //get current list of urls and add to list
 List<String> urls = urlSeq.get(cookie);
 if (null == urls) {
 urls = new ArrayList<String>();
 urlSeq.put(cookie, urls);
 }
 urls.add(url);

 //ckeck if matched with any predefined url sequence pattern
 String pattern = matchSequence(urls);
 if (null != pattern) {
 collector.emit(new Values(cookie, pattern));
 urls.clear();
 }
 }

Summing up

Storm is a powerful tool with a very flexible and scalable computation model for real time processing.  You can get more details on Storm here. The Storm wiki on github is also a good resource. if you are curious you get details of S4 in Apache web site.

Advertisements

About Pranab

I am Pranab Ghosh, a software professional in the San Francisco Bay area. I manipulate bits and bytes for the good of living beings and the planet. I have worked with myriad of technologies and platforms in various business domains for early stage startups, large corporations and anything in between. I am an active blogger and open source project owner. I am passionate about technology and green and sustainable living. My technical interest areas are Big Data, Distributed Processing, NOSQL databases, Machine Learning and Programming languages. I am fascinated by problems that don't have neat closed form solution.
This entry was posted in Big Data, Predictive Analytic, Real Time Processing and tagged , . Bookmark the permalink.

12 Responses to Big Data Caught in Storm

  1. Roger Rea says:

    Pranab … You describe S4 and Storm as “promising products.’ Yet, Yahoo S4 is Alpha code – V0.3.0. Twitter Storm is also in very early stages of development. Another option for realtime analytic processing is IBM InfoSphere Streams, which became commercially available 3 years ago. It was based on 6 years of prior research between IBM Research and the US Government. Trial code is available from ibm.com in the trials and demos portal. Sample code in the DeveloperWorks Streams Exchange is now available for log analysis, video processing and more. Support for the Predictive Model Markup Language is included in the product. Articles on IBM DeveloperWorks describe how to take SPSS Modeler models and deploy them to Streams for real time data mining, aka mining in microseconds. If you are truly interested in production applications of big data in real time, you owe it to yourself to take a look at Streams. Or, if you’re at the Strata Conference this week, come by and talk to us.

  2. Pranab says:

    Roger
    I am aware of IBM Infosphere Streams. My focus is on open source tools and that’s the reason I excluded IBM infosphere and other commercial stream processing products. I know S4 and Stream are early stage products. That’s why I called them “promising”. Storm is used by Twitter in production though.

  3. Jeryl Cook says:

    Nice Article. It would be great to see performance comparisons between S4 vs. Storm…do you have any performance writeup?

    I really believe IBM InfoSphere Streams needs an “open source” version of this product… I am sure it will outside S4 and Storm due to it’s maturity and company backing by IBM..

  4. Pranab says:

    @Jeryl Thanks. Unfortunately, I don’t have any performance benchmark for Storm and S4.

  5. Pingback: Big Data Caught in Storm | BigDataCloud.com

  6. Mauricio G. says:

    Hi Pranab, nice article !

    I’m doing my first tests in Storm, and your article is a very good proof of concept.
    Can you share the source code of this article? I have some doubts, for example…where is defined the variable urlSeq ?

    Thanks,
    Mauricio G.
    Chile

  7. Ajay says:

    Pranab,
    Thanks. Good article to get understanding of Storm and comparison with Hadoop. Do you suggest that we have Hadoop and Storm both as part of the architecture? Storm where streaming is required for real time processing and Hadoop as well? Or are you suggesting if there is a real time processing then we replace Hadoop with Storm? Would be great if you can share a architecture image of the architecture with Storm.

    • Pranab says:

      Ajay
      I wouldn’t go so far as to say Storm replaces Hadoop. For offline scalable batch processing, Hadoop is still the king. Typically they would coexist. Here is an example.

      For fraud analytic you might build some kind of model doing batch processing of historical data using Hadoop and then use that model in Storm for real time detection of fraud.

    • Jeryl Cook says:

      Hadoop is for batch processing:queries are run on data that is already stored somewhere…. Storm is a streaming engines process data comes in continuously..you wouldn’t ‘store’ data that Storm processes.. they are really not competing projects just depends on the requirements..

  8. näsplastik says:

    Having read this I thought it was really
    informative. I appreciate you finding the time and energy to put this short article together.
    I once again find myself personally spending way too much time both reading and posting comments.
    But so what, it was still worthwhile!

  9. Pingback: Real Time Fraud Detection with Sequence Mining | Mawazo

  10. Pingback: Real Time Fraud Detection with Sequence Mining | Big Data Cloud

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s