Bring some Spark into your life


Hadoop is a great cluster computing framework. But sometimes  it may not be a great fit for your particular problem in hand. Or you may be having Hadoop fatigue and want to explore other options. There are certain problems where you want to make multiple passes through the same data set iteratively. I run into them trying to  implement data mining algorithms. Another example is a typical analytic application where, you want to aggregate facts over different dimensions on the same data set.

In this post, I will show how to implement KNN clustering algorithms using Spark.  Spark is another parallel cluster computing framework like Hadoop. My example  is based on the the source code that comes with the Spark installation package in the example directory, except for some minor changes. The focus here is not on the  clustering algorithm but the suitability of Spark for this kind of problems.

Why Not Hadoop

For iterative solutions, in the Hadoop world,  for each iteration you have to read the data from HDFS and then process it. You might be lucky and  Hadoop will be able to meet your data locality requirement and will assign your mapper task to node hosting the particular split.  You might be   luckier  and your split data will be found in OS page cache. But that’s lot of hope and prayer.

For many problems, you might also generate some intermediate data during each iteration that you might need in subsequent iteration. In Hadoop there is no convenient way to  retrieve that data. You have to resort to writing to HDFS or ZooKeeper.

Why Spark

Spark is another cluster computing platform, which addresses these issues. A Spark RDD (Resilient Distributed Dataset) , can optionally be cached for iterative processing. This is what brought me to Spark. In this section, I will do a brief overview of Spark. For details please refer to this.

Spark is written in scala. You write your logic as Scala closure. The user written closures get shipped across the cluster
An RDD is a read only collection of records partitioned and replicated across the cluster. It could represent a file or the output of some other operation.
An RDD is ephemeral by default. It can be cached during the life time of an application. It can also be saved in persistent storage
You transform one RDD to another using various Spark provided transformation functions and your own closure logic
You can perform an action on RDD which returns a value e.g. collect()
Spark workers are long running processes that can store RDDs in RAM across operations and process them
Spark manages the lineage of an RDD. The lineage could be used to regenerate lost RDD
Because of the ability to cache RDD, Spark enables pipelining output of one processing phase to the next phase. Pipelining is more efficient than materializing intermediate data on disk

In the  Spark architecture, the cluster computing framework in decoupled from the cluster resource management (read Job Tracker as in Hadoop). Cluster resource management is delegated to a framework called Mesos.

You can run Spark in local mode or in the clustered mode. In clustered mode it leverages Mesos for cluster wide resource management.

KNN Clustering

It’s time for an example. The idea behind KNN clustering is simple.  It’s a special case of a broad category of solution know as Expectation Maximization (EM) The computation steps are as follows

  1. Guess initial set of cluster positions.
  2. Assign each data point to it’s nearest cluster.
  3. Recalculate cluster position base on member values
  4. Go back to step 2 and repeat until convergence

Here is the main driver in Scala for the Spark based solution

def main(args: Array[String]) {
  if (args.length < 4) {
    System.err.println("Please provide all arguments")
    System.exit(1)
  }

  val sc = new SparkContext(args(0), "SparkLocalKMeans")

  //input
  val lines = sc.textFile(args(1))

  //parse to a vector and cache
  val data = lines.map(parseFeatureVector _).cache()

  //params
  val K = args(2).toInt
  val convergeDist = args(3).toDouble
  val maxIter = args(4).toInt

  //initial cluster centers
  var points = data.takeSample(false, K, 42)
  var kPoints = new HashMap[Int, FeatureVector]
  var tempDist = 1.0

  for (i  convergeDist && i < maxIter) {
    println("Next iteration: " + (i + 1))
    //cluster and and member point
    var closest = data.map (p => (closestPoint(p, kPoints), (p, 1)      ))

    //recalculate cluster center
    var pointStats = closest.reduceByKey {case ((x1, y1), (x2, y2))     => (x1 + x2, y1 + y2)}

    var newPoints = pointStats.map {pair => (pair._1, pair._2._1 /      pair._2._2)}.collect()

    //distance between existing and new cluster centers
    tempDist = 0.0
    for (pair <- newPoints) {
      tempDist += kPoints.get(pair._1).get.squaredDist(pair._2)
    }
    //reset cluster centers
    for (newP <- newPoints) {
      kPoints.put(newP._1, newP._2)
    }

    i = i + 1
    println("Next centers: " + kPoints)
  }

  println("Final centers: " + kPoints)
  System.exit(0)

}

Here is the dissection of the code snippet

  1. In line 10 we are realizing an RDD by reading data off the storage
  2. In line 13, we are running map to convert each line to an array of doubles and caching the resulting RDD
  3. In line 21, we are taking a sample of the RDD, which was previously cached, to be used as initial cluster estimates. The function takeSample()  is an action, since it returns values
  4. In line 25 we enter the  iteration loop to find the clusters.
  5. In line 28, we run map on previously cached RDD. It returns a cluster and associated member point. The function closestPoint() does the actual computations to find the closest cluster for a data point.
  6. In line 31 we perform reduce function on the RDD returned by the previous step to find the updated cluster centers.
  7. In line 37, we find distance between old and the new clusters, which is used as the convergence criteria

Location Data

I used location data in lattitude and longitude to find the clusters  using the implementation in the last section. Here is some sample of the input

47.17003 27.56448
47.17021 27.56443
47.16354 27.58189
62.6182 29.70866
62.59809 29.74448
62.59809 29.74448
62.59809 29.74448
62.59809 29.74448
62.60386 29.74453
.................

Clustering of location data may be useful in many ways e.g., traffic pattern modeling and analysis. Choosing the number of clusters in KNN clustering is challenging. But that’s another topic for discussion. Here is the output for 4 and 6 clusters

4 clusters
----------
3 -> (51.175886321839045, 21.22923979885059), 
4 -> (23.220128842975214, 119.69928512396693), 
1 -> (62.62441100498275, 28.889352388611904), 
2 -> (46.183877212336895, 27.033420071174362)

6 clusters
----------
5 -> (61.90192021205356, 23.33801447544643), 
3 -> (50.79415915915913, 21.33747981981983), 
4 -> (23.220128842975214, 119.69928512396693), 
1 -> (62.56704506602941, 29.689342401913652), 
6 -> (63.60643065288355, 29.589140565832405), 
2 -> (46.183877212336895, 27.033420071174362)

What if Hadoop was used

To get a better perspective, let’s take a look at what needed to be done for a Hadoop map reduce based solution to the problem. It will go something like below

  1. Write a map reduce job to sample the data set.  The mapper will sample the data based on the value of k. The reducer will only output  the first k records. Instead of map reduce you could  write a java program to sample the input.
  2. Write a map reduce job to find the nearest cluster of the  data points. The mapper will read the output of the previous job (cluster estimates) and store it in the cache. The reducer will recompute cluster centers and write them
  3. The driver code will read current and previous cluster estimates from HDFS and find the difference. If the difference is not within the threshold, go back to step 2

As you have noticed, in step 2 the data points get reread from HDFS for each iteration. Another noteworthy point is that Hadoop has no way to return a value to the the driver program.  Even for small output, you have to write it to distributed storage e.g., HDFS or ZooKeeper.

Summing Up

For iterative parallel computation Spark is a great fit. It’s been claimed to be 30 times faster than Hadoop for these kinds of problems. It was easy to install and setup. Functional programming with Scala added to the great experience I had with Spark.

Advertisements

About Pranab

I am Pranab Ghosh, a software professional in the San Francisco Bay area. I manipulate bits and bytes for the good of living beings and the planet. I have worked with myriad of technologies and platforms in various business domains for early stage startups, large corporations and anything in between. I am an active blogger and open source project owner. I am passionate about technology and green and sustainable living. My technical interest areas are Big Data, Distributed Processing, NOSQL databases, Machine Learning and Programming languages. I am fascinated by problems that don't have neat closed form solution.
This entry was posted in Big Data, Cluster Computation, Scala, Spark and tagged , , , . Bookmark the permalink.

4 Responses to Bring some Spark into your life

  1. Pingback: Spark and Shark In the News | Andy Konwinski

  2. Pingback: Spark, Shark, and BDAS In the News | Andy Konwinski

  3. Hardik says:

    awesome thanks for sharing, its very helpful

  4. blank says:

    Just letting you know if this is your authentic code….Cloudera is using it as part of their training materials…..and they didn’t really care to change much of it either….

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s