Log Analysis and Incident Reporting with Hadoop


What do we have

We have a very successful eCommerce site. We have lot of traffic  in our eCommerce site, which is good.  Our  visitors are busy buying our cool products, which is even better.

What do we want

We want to do even better. We want to mine our enormous log files for our benefit.  A specific example might be detecting incomplete customer registration process, so that our usability folks can brainstorm  about our account registration pages to identify any usability issue, based on findings from our log records  and improve our design.

Another example might be identifying abandoned shopping carts and finding the root cause of abandonment. It could be related to performance and high latency or the the user not being able to seamlessly navigate back and forth between product catalog and shopping cart. Once we find the abandoned shopping carts, we  may want  customer service to send friendly emails to those users offering help in making their purchases.

How do we get there

What is the problem, you might say, just write a script to analyze the log files and look for the patterns of interest. That makes sense when dealing with small amount of data and I have done that in some past projects, where I only dealt with one log file at a time.

Now consider this. We want to analyze last one month’s log record. On an average we have 100,000 visits per day. On an average we have about 100 log records for each visit or session. A typical log record size might be 100 bytes.  Considering all these numbers, we have 30GB of data generated in a month. We also also use the rolling log file pattern and with a file size limit of 250 MB. So the 30GB of data will be spread out across 120 files.

It’s futile to even think  about processing this data in one monolithic application running on one server. Fortunately Hadoop and Map Reduce come to our rescue. Map Reduce is a simple and yet powerful parallel distributed  processing model based on shared nothing functional programing model and the divide and conquer pattern.

We would like to group  our analysis result by the cookie id, which identifies an unique visit or session.  Analysis by session is important various reasons. To find interesting patterns, we need to have log records grouped by session.

Each line of of our log has the following format

  1. Timestamp
  2. Message

The time stamp field is followed by white space and rest of the line is the actual log message. The log message always has the text cookieId=xxxx embedded in it. Additionally, for logged in user the log message will also have userId=xxxx embedded in it.

The first thing we need to do is to download all the log files from the production server and copy them into HDFS. Hadoop will break them up into blocks and distribute them over the Hadoop cluster. Each block of each file may be replicated also depending on the configure replications factor for HDFS

Our first consideration is how  to group the records. If we simply group by cookie in our map output i.e., emit cookie  as the key in the map output, it won’t be enough, because the values  for a key which are the log records will be handed over to the reducer not in any particular order. However we would like to have our log records for particular cookie chronologically ordered for various reasons.  The keys i.e.. cookies in our case, are sorted by Hadoop. But we are not so much interested in that.  We are more interested in grouping by cookie and secondary sorting by time stamp.

To achieve secondary sorting, our key for the map out will be a combination of cookie followed by time stamp. We will use a class called TextLongPair for our key which consists of a Text object for the cookie and a Long for the time stamp, which is time in ms. It implements the WritableCoparable interface. Our map method is as follows.

public void map(LongWritable key, Text value, Context context) throws IOException {
  List tokens = parse(value);
  String timeStamp = tokens.get(0);
  long timeInMili = getTime(timeStamp);
  String msg = tokens.get(1);
  String cookie = parseCookie(msg);
  String userId = parseUser(msg);
  TextLongPair key = new TextLongPai(cookie, timeInMili);
  context.write(key, value);
}

Here is a rundown of the map method. We extract the time stamp and the message from the log record. We convert the time stamp to time in mili sec. We extract the cookie from the message. We create a composite key using cookie and time in mili sec. Finally we emit our output.

When map outputs are shuffled by Hadoop, it’s partitioned by key and within each partitioned group sorted by key. Since we have a custom composite key, we need to tell Hadoop how to sort the keys by providing a key comparator class.

public static class KeyComparator extends WritableComparator {
  protected KeyComparator() {
    super(TextLongPair.class, true);
  }
  public int compare(WritableComparable w1, WritableComparable w2) {
    TextLongPair tl1 = (TextLongPair) w1;
    TextLongPair tl2 = (TextLongPair) w2;
    int cmp = tl1.getFirst().compareTo(tl2.getFirst());
    if (cmp == 0) {
      cmp = tl1.getSecond().compareTo(tl2.getSecond());
    }
    return cmp;
  }
}

We compare both components of the key. If the first components are equal, we consider the  second components. So the keys will be primary sorted by cookie and secondary sorted by the time stamp.

We also need to have control over key partitioning, because we want all the records for a cookie to be forwarded to the same reducer.

The default Hadoop hash key partitioning won’t guarantee that. Here is the partition method of our partitioner to override the default behavior.

public int getPartition(TextLongPair key, Text value, int numPartitions) {
  return (key.getFirst() .hashCode() & 127) % numPartitions;
}

We take the hash code of cookie, convert it to a 8 bit unsigned integer and take the modulo with the number of partitions. Number of partitions is based on the number of reducers set for the Hadoop job. Number of reducers should be based on the amount of data we have and the number of nodes we have in our Hadoop cluster.

So far we have have the log records partitioned by cookie and for a given cookie sorted by the time stamp. But we are not quite done yet. The remaining issue has to with reducer calls.

A reducer call gets all the values for a given key group. In Hadoop,  by default there is one key group for each key. If we don’t change the default behavior, a reducer call will get only one  log record for each key, which  is the  cookie, time stamp pair.

The behavior we want is to  have all the log records for given cookie to be sent in one reducer call and to that effect we provide Hadoop with the following group comparator class. We consider only the cookie part of the key in grouping, so that all log record for a cookie gets sent to the reducer in one call.

public static class GroupComparator extends WritableComparator {
  protected GroupComparator() {
    super(TextLongPair.class, true);
  }
  public int compare(WritableComparable w1, WritableComparable w2) {
    TextLongPair tl1 = (TextLongPair) w1;
    TextLongPair tl2 = (TextLongPair) w2;
    int cmp = tl1.getFirst().compareTo(tl2.getFirst());
    return cmp;
  }
}

The remaining piece of the puzzle is the reducer method, which is as follows

protected void reduce(TextLongPair key, Iterable values, Context context)
throws IOException, InterruptedException {
  List logs = new ArrayList;();
  for (Text value : values) {
    logs.add(value.toString());
  }
  List incidentLogs = findIncidents(logs);
  if (null != incidentLogs){
    Text val = new Text();
    for (String log :incidentLogs) {
      context.write(NullWritable.get(), val.set(log));
    }
  }
}

We send all the log records for a cookie to the findIncidents() method, which parses the log records and determines and determines if there is an incident to be reported. The findIncidents() method when it finds an incident,  collects few records prior  to the incident identifying records, few after the incident  identifying records and preppends   the set  of records with a special incident identifier record as foll0ws.  This record acts as boundary between incidents. If no incident are found the method  returns null.

incident: abandoned shopping cart userId=12345

………… other log records capturing the context

All the reduces output files will contain incident data in this format. The data could imported back into the file system for further analysis.

That’s all for this post. As usual, comments are welcome

public int compareTo(MyWritableComparable w)
About these ads

About Pranab

I am Pranab Ghosh, a software professional in the San Francisco Bay area. I manipulate bits and bytes for the good of living beings and the planet. I have worked with myriad of technologies and platforms in various business domains for early stage startups, large corporations and anything in between. I am an active blogger and open source contributor. I am passionate about technology and green and sustainable living. My technical interest areas are Big Data, Distributed Processing, NOSQL databases, Data Mining and Programming languages. I am fascinated by problems that don't have neat closed form solution.
This entry was posted in Hadoop and Map Reduce, Web and tagged , , . Bookmark the permalink.

6 Responses to Log Analysis and Incident Reporting with Hadoop

  1. janveodin says:

    Awesome this helped me understand how I can parse a per user log file with hadoop, I searched all morning for this. Thank you!

  2. pkghosh says:

    janveodin

    It’s great that you found it useful. Makes my writing worthwhile.

  3. Happy3 says:

    Very useful and hands-on! Thank you for your share!

  4. Pravin says:

    Pranab – Excellent article!! You have explained all important steps in processing very well.

  5. Have you ever thought about creating an e-book or guest authoring on
    other blogs? I have a blog based upon on the same ideas you discuss and would love to have you share some stories/information.
    I know my audience would appreciate your work. If you’re even remotely interested, feel free to shoot me an e-mail.

  6. First off I want to say terrific blog! I had a quick question in which I’d like to ask if you don’t mind.
    I was interested to find out how you center yourself
    and clear your mind prior to writing. I have had trouble clearing my mind
    in getting my thoughts out there. I do take pleasure in writing but
    it just seems like the first 10 to 15 minutes are generally lost simply just trying to figure out how to begin.
    Any recommendations or tips? Appreciate it!

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s