Cassandra secondary index to the rescue


In an earlier blog, I discussed Cassandra data model for a BPM system. I used an eCommerce order processing system driven by a BPM as an example. In this post, we will discuss some of the data access usage scenario and associated issues.

Query with secondary index

Cassandra provides  indexed access only through the primary key, which is  the row key. But in a real application, we almost always need to query by other columns. But unfortunately secondary indexes don’t exist in Cassandra out of the box , not until the next version any way.

The query support in Cassandra  version  .7 is limited to hash index. So for many applications requiring more complex indexes, you are on your own.

What if you want an ordered index for range queries or a composite index involving multiple fields.

The specific scenario that this post will center around is as follows. When an inventory allocation request is sent to the warehouse through web services, the warehouse returns an inventory request Id in the synchronous response. This response is just an acknowledgement of the fact that the request has been received by the warehouse inventory management system.

The inventory management system runs it’s allocation process using complex optimization algorithms in batch mode every hour for all the request it receives during the hour.  As a result, the order processing system receives the  actual inventory allocation  response later asynchronously. The  response contains the inventory request Id, that came earlier during the first call. With the inventory request Id, we have to correlate back to the corresponding process. The response with JSONish notation is as below

 
{ 
  inventoryRequestId : xxx 
  items : 
  [ 
    { 
      sku: xxx 
      quantityRequested : xxx 
      quantityAllocated : xxx 
    }, 
    { 
      sku: xxx 
      quantityRequested : xxx 
      quantityAllocated : xxx 
    }, 
    ...... 
  ] 
}

Our challenge is to find the corresponding process Id, given the inventory request Id. We can not afford to do full scan of the Process column family, looking for the row that has the correlationEntityId equal to the inventory request Id.

Build the index

So we need to build an index for correlationEnntityId, which will allow us to quickly look up the row key of the process instance, given correlationEntityId. With the row key in our hand we can read the full row or selected columns of the row from the Process column family, depending on our need.

The BPM process will read the process data and based on the process type, it will read the process meta data from the ProcessType column family.  Based on the current state of the process and the event, it will find out what the next state should be. Next, the BPM will invoke the necessary callback handler and set the process state to the next state.

Enough of that digression.  Let’s go back to our index. Database systems generally have two kinds of indexes, hashed and ordered. For simple queries based on equality operation, hashed index is the way to go. The secondary key is hashed and the hash bucket will contain list of values. In our case the value is the (secondary key, row key) tuple. We  look for a matching secondary key and we find the corresponding row key of the indexed entity we are after. I am assuming that the secondary keys are unique.  If not, there may be multiple row keys for a given secondary key.

For so called range queries  e.g., queries involving greater than or less than operator,  indexes need to be ordered. Typically ordered indexes are built with  binary search tree e.g., B+ tree.

In our case we are interested in equality based query only. For a given correlationEntityId, we want to lookup the row key of the corresponding process, which is the processId. Moreover correlationEntityId, our secondary key is unique. By that I mean that the association between the  correlationEntityId and the processId is one to one.

We will use a separate column family for storing our indexes. We will create and store the index when we receive it first time from the warehouse inventory system.  A typical row of the index column family is as below. The row key for the index column family, is a  hash of  correlation type and correlation id combination.

row_key cor_type1 cor_type2
cor_id1 cor_id2 cor_id3 cor_id4 cor_id5
proc_id1 proc_id2 proc_id3 proc_id4 proc_id5

The row key is hash(cor_type:cor_id). This is a column family with super columns, one for each correlation type.  Each super column contains number of columns, one for each correlation Id and process Id pair.

The column names are cor_id1, cor_id2. They are the secondary key values i.e., the different values of the correlation Id (e.g., inventory request Id). The column values proc_id1 , proc_id2 are the row keys of the Process column family.

The process to create index is as follows.

Here I am hashing the combination of correlation type and correlation Id to create 100 hash values, one for each row. Essentially I am distributing the indexes in 100 rows around the Cassandra cluster.

Even if all the indexes could fit in one row, it would not have been a great idea, because it would have created a hot spot in the cluster.


private void indexByCorrelationId(String correlationType, String correlationId,
String processId) throws Exception {
 String rowKey = getCorrelationRowKey(correlationType, correlationId);

 //do the insert
 byte[] superCol = Util.getBytesFromString(correlationType);
 List colValues = new ArrayList();
 ColumnValue colVal = new ColumnValue();
 colVal.write(correlationId, processId);
 colValues.add(colVal);
 SuperColumnValue superColVal = new SuperColumnValue();
 superColVal.setName(superCol);
 superColVal.setValues(colValues);
 List superColVals = new ArrayList();
 superColVals.add(superColVal);

 DataManager.insertSuperColumns(KEYSPACE, COLUMN_FAMILY_CORR_INDEX, rowKey,
 superColVals);

 }

 private String getCorrelationRowKey(String correlationType, String correlationId) throws Exception{
 //hash correlationType and correlationId
 String rowKeySeed = correlationType + correlationId;
 byte[] seedBytes = Util.getBytesFromString(rowKeySeed);
 int sum = 0;
 for (byte b : seedBytes){
 sum += b;
 }

 //distribute over 100  hash buckets, each one is a row
 String rowKey = "" + sum % 100;

 return rowKey;
 }

I am using a simple Cassandra data access API discussed in a separate post of mine.The process to query by index is as follows. It looks up row key (hashed correlation type and correlation Id), super column name (correlation type) and column name (correlation Id)

public String findCorrelatedProcess(String correlationType, String correlationId)
throws Exception {
 String processId = null;
 String rowKey = getCorrelationRowKey(correlationType, correlationId);
 byte[] superColName = Util.getBytesFromString(correlationType);
 byte[] colName = Util.getBytesFromString(correlationId);

 ColumnValue  colVal = DataManager.retrieveSubColumn(KEYSPACE,
 COLUMN_FAMILY_CORR_INDEX, rowKey,   superColName,  colName);
 processId = Util.getStringFromBytes(colVal.getValue());
 return processId;
 }
}

The hashing we are doing for row key  is not to be confused with Cassandra hash partitioning.  The purpose of Cassandra hashing of row key  is partitioning and locating the cluster nodes where the corresponding row  resides

Concluding thoughts

Here is a nice post on Cassandra secondary index , in case you are interested. You should design your indexes based on your application’s needs. The one in this post is pretty basic and simple.

Index management involves lot of work, when data changes. In our case after an order is completely processed, we don’t need to look up by the correlationEntityId. So the corresponding column in the index column family row can be removed. Our index does not change, but becomes obsolete after an order is processed.

Sometimes it is helpful to cache certain columns along with index so that the query to look up the column family will not be necessary.

With our example, we could  store the  current state  and the last state transition time of the process along with the row key, by having  index column family contain super columns. The structure of the index column family row will be as below

 
{ 
  hashed_cor_entity: //row 
  [ 
    cor_entity_id1 : //super column 
    { 
      process_id : xxx 
      cur_state : xxx 
      last_transition : xxx 
    }, 
    cor_entity_id2 : //super column 
    { 
      process_id : xxx 
      cur_state : xxx 
      last_transition : xxx 
    }, 
    ....... 
  ] 
}

In this post, my focus was on a simple hashed index. In a future post, I will be  covering ordered index and composite index, mostly by leveraging Cassandra’s column sorting.  So, stay  tuned.

To learn about Cassandra .7x secondary index, this is a good start. There is an interesting comment there about native Cassandra secondary index. It should only be used for columns with low cardinality i.e., columns with few unique values.  I tried to understand the reason behind this limitation. So far I have no idea. May be the Cassandra source code will provide some clues. Documentation on this topic is almost non existent.

Advertisements

About Pranab

I am Pranab Ghosh, a software professional in the San Francisco Bay area. I manipulate bits and bytes for the good of living beings and the planet. I have worked with myriad of technologies and platforms in various business domains for early stage startups, large corporations and anything in between. I am an active blogger and open source project owner. I am passionate about technology and green and sustainable living. My technical interest areas are Big Data, Distributed Processing, NOSQL databases, Machine Learning and Programming languages. I am fascinated by problems that don't have neat closed form solution.
This entry was posted in Cassandra, Indexing, NOSQL and tagged , , , . Bookmark the permalink.

One Response to Cassandra secondary index to the rescue

  1. Pingback: stefanorodighiero.net » links for 2011-09-17

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s