Cassandra Range Query Made Simple


In Cassandra, rows are hash partitioned  by default. If you want to data sorted by some attribute, column name sorting feature of Cassandra is usually exploited. If you look at the Cassandra slice range API, you will find that you can specify only the range start, range end and an upper limit on the number of columns fetched.

However in many applications the need is to paginate through the data i.e each call should fetch a predetermined number of items.

There is no easy way to map the desired number of items to be returned  to the column name range, particularly when column names  are not evenly distributed.

The only option is to select a range such that that the number of items expected to return is far greater than the max limit. In this post I will discuss an adaptive range reader that I have implemented recently in agiato hosted on github. It borrows ideas from feedback control system to adaptively change the column range so that a predetermined number of items are returned.

Note on Cassandra Read

Reads in Cassandra is more expensive than writes, which may sound very counter intuitive. Cassandra maintains multiple versions of a column  in a  persistent data structure called SSTable in the disk. There is also a corresponding  in memory data structure called MemTable, which contains the latest writes.

During the read for a column, Cassandra has to reconcile the different versions across the MemTable and one or more SSTable to find the latest column value.

Digging into Slice Range

Let’s take an example of time series data and consider the model where the data is sharded by by hour. In this  model, a row contains an hour’s worth of data. The column family has only simple columns.There are no super columns.

Each data item is in a column, where the column name is a long which is the number of mili sec since the last hour when the data item was generated. The column value is the actual data stored as a serialized byte stream.  There could be up to 3.6 million columns in a row with a maximum data arrival rate of 1 per mili sec. The data model is as below. The row key is of the format yyyy-mm-dd-hh. Here is a detailed blog on how to model time series data in Cassandra in case you are interested.

2012-01-20-14 …. 73765 73769 ….
…. xxxx xxxx ….
2012-01-20-15 …. 71875 71879 ….
…. xxxx xxxx ….

Here is how the slice range constructor is defined. It gets used in the Cassandra API get_slice(). As you can tell, there is no way to set the appropriate range that will ensure the return of expected number of items.

SliceRange(byte[] start, byte[] finish, boolean reversed, int count)

The only thing you can do is to set start low enough and finish high enough, and hope  that the number of columns in that range is  greater than count, resulting in count number of items returned from the query. But, there is no guarantee and nobody likes hope based logic.

With our example, with a fluctuating data arrival rate, it is difficult to select the appropriate slice range to get the same pre determined number of columns from each call.

Adaptive Range Selection

RangeReader is adaptive because it adapts itself to return a predictable number of items for each query execution. In the RangeReader class, some of the important constructor arguments related to range adjustment are as follows

batchSize desired number of columns to be returned
maxFetchSize maximum number of columns to be returned
batchSizeTolerance determines if range needs to be adjusted based on how much returned column count deviates from batch size
startCol start column
initialRangeSize initial range

RangeReader  tires to return the number of columns as close as possible to batchSize. However the actual number of columns returned  could be anywhere between 0 and maxFetchSize.

It works like a feedback loop. Before executing the query, it checks the result of the last query invocation. If the last query invocation returned  less than batchSize number of columns the range is increased proportionally and vice versa.

If the last query execution returned no data, then the range is doubled. It’s increased non linearly, until the query returns some data.

Here is the main query method that the client calls repeatedly to navigate through columns in a row. The client application will call this method repeatedly to navigate through data.


public List getColumnValues() throws Exception {
  if (!atRowEnd) {
    //set column range
    setEndCol();
    colRange.clear();
    colRange.add(startCol);
    colRange.add(endCol);

    //range query
    colValues =  dataAccess.retrieveSubColumns( rowKey, superCol, 
      colRange, true, maxFetchSize, consLevel);
    atRowEnd = endCol == endMarker &&  colValues.size() = 0) {
      if (lastFetchCount > batchSizeMax || lastFetchCount  startColLong){
      endColLong = startColLong + curRangeSize;
      endCol = Util.getByteBufferFromLong(endColLong);
    } else {
      endCol =endMarker;
    }
  }
}



The full source code can be viewed here. It uses  a simple  greedy algorithm to adjust the range based on the local distribution of data. Currently it supports only LongType column. In future I will add support for UTF8Type.

Pagination

Although the RangeReader class makes best effort in returning batchSize number of records, it may not be good enough, if you want exactly batchSize number of columns to be returned.

One way to achieve pagination is to have a Paginator class which does some extra buffering on the result returned by RangeReader.

If the number of columns returned by RangeReader class is less than batchSize, it makes repeated calls to RangeReader, until batchSize number of columns have been fetched. On the other hand if the number of columns returned by RangeReader is more than batchSize, only batchSize number of columns is returned to the client and the rest is cached. I will be implementing the Paginator class in near future. So, stay tuned.

For commercial support for any solution in my github repositories, please talk to ThirdEye Data Science Services. Support is available for Hadoop or Spark deployment on cloud including installation, configuration and testing,

About Pranab

I am Pranab Ghosh, a software professional in the San Francisco Bay area. I manipulate bits and bytes for the good of living beings and the planet. I have worked with myriad of technologies and platforms in various business domains for early stage startups, large corporations and anything in between. I am an active blogger and open source project owner. I am passionate about technology and green and sustainable living. My technical interest areas are Big Data, Distributed Processing, NOSQL databases, Machine Learning and Programming languages. I am fascinated by problems that don't have neat closed form solution.
This entry was posted in Cassandra, NOSQL and tagged , , . Bookmark the permalink.

7 Responses to Cassandra Range Query Made Simple

  1. Pingback: Cassandra Range Query Made Simple | BigDataCloud.com

  2. Pingback: Putting NoSQL In Its Place - Steve Karam :: The Oracle Alchemist

  3. Pingback: Cassandra | Annotary

  4. Vladimir says:

    Hi Pranab,
    Can you give an example for composing of RangeReader object. Its constructor accepts quite a large number of parameters, and it is unclear what types you pass to params as Object (rowKey, startCol, superCol, etc.).

    Thanks.
    Vladimir

  5. Gaurav Kumar says:

    Hello Pranab,

    Thanks for nice article.

    Our use cases is bit different. In our column names are session id ( i..e alphanumeric or UTF8Type) and even we don’t know the start column name

    KR,
    Gaurav

    • Pranab says:

      Gaurav
      I have been meaning to support UTF8. Will do it at the earliest opportunity. For UTF8, you could start with byte value 0, if you don’t know the start column. The algorithm will scan with larger range sizes and will eventually get to the actual start column.

      Pranab

Leave a reply to Pranab Cancel reply