Easy Cassandra Data Access


This post is about a simple no nonsense data access API for Cassandra. I did not start with a grandiose plan for yet another high level Cassandra API. I was implementing a Cassandra based BPM that I blogged about earlier.

As I was implementing, I was refactoring my Thrift based Cassandra API  code into a separate package and a set of classes. So my API grew organically out of my implementation and refactoring .  It’s simple and intuitive and contains less than half a dozen classes. It’s not complete by any means, but covers the most basic cases.

Cassandra data access operation

In a Cassandra based application, you essentially perform your CRUD operations on a set of regular columns or a set of columns under a super column. In Cassandra, column names and values are stored as byte array. So the basic data types need to be serialized to byte array, as data moves from application to Cassandra. The byte array needs to be  deserialized  back to basic data types as the application reads data from Cassandra.

Cassandra generally does not care about what the byte stream represents and does not try to interpret the data, except for column names. Cassandra provides a unique feature of sorting columns by column names. The column name may be interpreted in many ways as follows

Compare With Data Interpretation
BytesType Default without any interpretation
AsciiType As US ASCII
UTF8Type As UTF-8 encoded string
LongType As 64 bit long
LexicalUUIDType As byte array with 128 bit UUID
TimeUUIDType As timestamp with 128 bit version 1 UUID

These attributes are specified per column family in the keyspace configuration file. For a column family with super column, this attribute can be specified separately for super column names and column names.

The API

The main class that has the Cassandra data access methods are in DataManager, which is as follows. There are some supporting data transfer and serialization classes discussed later.


public class DataManager {

 public static SuperColumnValue  retrieveSuperColumn(String keySpace,
 String colFamilly, String rowKey,  byte[] superCol)
 throws Exception {
 Cassandra.Client   client = Connector.instance().openConnection();

 SlicePredicate slicePredicate = new SlicePredicate();
 SliceRange sliceRange = new SliceRange();
 sliceRange.setStart(new byte[] {});
 sliceRange.setFinish(new byte[] {});
 slicePredicate.setSlice_range(sliceRange);

 ColumnParent colPar = new ColumnParent(colFamilly);
 colPar.setSuper_column(superCol);
 List<ColumnOrSuperColumn> result =  client.get_slice(keySpace, rowKey,
 colPar, slicePredicate, ConsistencyLevel.ONE);

 SuperColumnValue superColVal = new SuperColumnValue();
 superColVal.setName(superCol);
 List<ColumnValue> colValues = new ArrayList<ColumnValue>();

 for (ColumnOrSuperColumn colSup : result){
 Column col = colSup.getColumn();
 if (null != col){
 ColumnValue colVal = new ColumnValue();
 colVal.setName(col.getName());
 colVal.setValue(col.getValue());
 colValues.add(colVal);
 }
 }
 superColVal.setValues(colValues);

 return superColVal;
 }

 public static List<SuperColumnValue>  retrieveSuperColumns( String keySpace,
 String colFamilly, String rowKey,  List<byte[]> superCols, boolean isRange)
 throws Exception {
 Cassandra.Client   client = Connector.instance().openConnection();

 SlicePredicate slicePredicate = new SlicePredicate();
 if (isRange) {
 SliceRange sliceRange = new SliceRange();
 sliceRange.setStart(superCols.get(0));
 sliceRange.setFinish(superCols.get(1));
 slicePredicate.setSlice_range(sliceRange);
 } else {
 slicePredicate.setColumn_names(superCols);
 }

 ColumnParent colPar = new ColumnParent(colFamilly);
 List<ColumnOrSuperColumn> result =  client.get_slice(keySpace, rowKey,
 colPar, slicePredicate, ConsistencyLevel.ONE);

 List<SuperColumnValue> superColVals = new ArrayList<SuperColumnValue>();
 for (ColumnOrSuperColumn colSup : result){
 SuperColumn superCol = colSup.getSuper_column();
 if (null != superCol){
 SuperColumnValue superColVal = new SuperColumnValue();
 superColVal.setName(superCol.getName());
 List<ColumnValue> colValues = new ArrayList<ColumnValue>();
 for (Column col : superCol.getColumns()) {
 ColumnValue colVal = new ColumnValue();
 colVal.setName(col.getName());
 colVal.setValue(col.getValue());
 colValues.add(colVal);
 }
 superColVal.setValues(colValues);
 superColVals.add(superColVal);
 }
 }

 return superColVals;
 }

 public static  void  updateSuperColumns(String keySpace, String colFamilly,
 String rowKey, List<SuperColumnValue> superColVals)
 throws Exception{
 Cassandra.Client   client = Connector.instance().openConnection();
 long timestamp = System.currentTimeMillis();
 Map<String, Map<String, List<Mutation>>> job =
 new HashMap<String, Map<String, List<Mutation>>>();
 List<Mutation> mutations = new ArrayList<Mutation>();

 for (SuperColumnValue superColVal : superColVals ){
 byte[] superCol =  superColVal.getName();
 List<ColumnValue>  cols = superColVal.getValues();

 List<Column> columns = new ArrayList<Column>();
 for (ColumnValue colVal : cols){
 columns.add(new Column(colVal.getName(), colVal.getValue(), timestamp));
 }

 SuperColumn superColumn = new SuperColumn(superCol, columns);
 ColumnOrSuperColumn columnOrSuperColumn = new ColumnOrSuperColumn();
 columnOrSuperColumn.setSuper_column(superColumn);

 Mutation mutation = new Mutation();
 mutation.setColumn_or_supercolumn(columnOrSuperColumn);
 mutations.add(mutation);
 }

 Map<String, List<Mutation>> mutationsForColumnFamily =
 new HashMap<String, List<Mutation>>();
 mutationsForColumnFamily.put(colFamilly, mutations);

 job.put(rowKey, mutationsForColumnFamily);
 client.batch_mutate(keySpace, job, ConsistencyLevel.ALL);
 }

 public static void insertSuperColumns(String keySpace, String colFamilly,
 String rowKey, List<SuperColumnValue> superColVals)
 throws Exception {
 Cassandra.Client   client = Connector.instance().openConnection();
 long timestamp = System.currentTimeMillis();

 List<ColumnOrSuperColumn> colSuperColumns =
 ew ArrayList<ColumnOrSuperColumn>();
 for (SuperColumnValue superColVal : superColVals ){
 byte[] superCol =  superColVal.getName();
 List<ColumnValue>  cols = superColVal.getValues();

 List<Column> columns = new ArrayList<Column>();
 for (ColumnValue colVal : cols){
 columns.add(new Column(colVal.getName(), colVal.getValue(), timestamp));
 }

 SuperColumn superColumn = new SuperColumn(superCol, columns);
 ColumnOrSuperColumn columnOrSuperColumn = new ColumnOrSuperColumn();
 columnOrSuperColumn.setSuper_column(superColumn);

 colSuperColumns.add(columnOrSuperColumn);
 }

 Map<String, List<ColumnOrSuperColumn>> job =
 new HashMap<String, List<ColumnOrSuperColumn>>();
 job.put(colFamilly,  colSuperColumns);
 client.batch_insert(keySpace, rowKey, job, ConsistencyLevel.ALL);
 }

 public static List<ColumnValue>  retrieveColumns(String keySpace,
 String colFamilly, String rowKey)
 throws Exception {
 Cassandra.Client   client = Connector.instance().openConnection();

 SlicePredicate slicePredicate = new SlicePredicate();
 SliceRange sliceRange = new SliceRange();
 sliceRange.setStart(new byte[] {});
 sliceRange.setFinish(new byte[] {});
 slicePredicate.setSlice_range(sliceRange);

 ColumnParent colPar = new ColumnParent(colFamilly);
 List<ColumnOrSuperColumn> result =  client.get_slice(keySpace,
 rowKey, colPar, slicePredicate, ConsistencyLevel.ONE);

 List<ColumnValue> colValues = new ArrayList<ColumnValue>();
 for (ColumnOrSuperColumn colSup : result){
 Column col = colSup.getColumn();
 if (null != col){
 ColumnValue colVal = new ColumnValue();
 colVal.setName(col.getName());
 colVal.setValue(col.getValue());
 colValues.add(colVal);
 }
 }

 return colValues;
 }

 public void insertColumns(String keySpace, String colFamilly,
 String rowKey, List<ColumnValue> colVals)
 throws Exception {
 Cassandra.Client   client = Connector.instance().openConnection();
 long timestamp = System.currentTimeMillis();

 List<ColumnOrSuperColumn> colSuperColumns =
 new ArrayList<ColumnOrSuperColumn>();
 for (ColumnValue colVal : colVals ){
 Column col =  new Column(colVal.getName(), colVal.getValue(), timestamp);
 ColumnOrSuperColumn columnOrSuperColumn = new ColumnOrSuperColumn();
 columnOrSuperColumn.setColumn(col);
 }
 Map<String, List<ColumnOrSuperColumn>> job = new HashMap<String,
 List<ColumnOrSuperColumn>>();
 job.put(colFamilly,  colSuperColumns);
 client.batch_insert(keySpace, rowKey, job, ConsistencyLevel.ALL);
 }

 public static  void  updateColumns(String keySpace, String colFamilly,
 String rowKey, List<ColumnValue> colVals)
 throws Exception{
 Cassandra.Client   client = Connector.instance().openConnection();
 long timestamp = System.currentTimeMillis();

 Map<String, Map<String, List<Mutation>>> job =
 new HashMap<String, Map<String, List<Mutation>>>();
 List<Mutation> mutations = new ArrayList<Mutation>();

 List<Column> columns = new ArrayList<Column>();
 for (ColumnValue colVal : colVals){
 Column col = new Column(colVal.getName(), colVal.getValue(), timestamp);
 ColumnOrSuperColumn columnOrSuperColumn = new ColumnOrSuperColumn();
 columnOrSuperColumn.setColumn(col);
 Mutation mutation = new Mutation();
 mutation.setColumn_or_supercolumn(columnOrSuperColumn);
 mutations.add(mutation);
 }

 Map<String, List<Mutation>> mutationsForColumnFamily =
 new HashMap<String, List<Mutation>>();
 mutationsForColumnFamily.put(colFamilly, mutations);

 job.put(rowKey, mutationsForColumnFamily);
 client.batch_mutate(keySpace, job, ConsistencyLevel.ALL);

 }

}

What I have here is insert, update and retrieve methods for a set of columns and super columns. There are several limitations in the implementation. As you can see for the way slice predicates are built, all columns under a column family or super column are read for the retrieve methods.

But the API can be easily extended so that the client application could explicitly provide the slice predicate. Slice predicates have to do with what columns to fetch in a read operation.  I don’t have any key range related API either, as you may have noticed.

Consistency level is hard coded in the API implementation. It could be made part of the API signature and be passed explicitly by the client application.

The connection manager manages only one connection to Cassandra and does not use connection pooling. So it can only  be used for a single threaded application. But it should be relatively straightforward to implement a connection pool using Apache Commons Pool library for a multi threaded client application.

Data Transfer Classes

The API heavily relies on ColumnValue and SuperCoumnValue classes. They follow the familiar  data transfer object pattern. These classes encapsulate Cassandra thrift column and super column related classes. In addition to transferring data, they are also responsible for data serialization and deserialization. The ColumnValue class is as follows


public class ColumnValue  extends BaseColumnValue{
 private byte[] value;

 public void write(String name, String value) throws IOException{
 this.name = Util.getBytesFromString(name) ;
 this.value = Util.getBytesFromString(value) ;
 }

 public String[] read() throws IOException{
 String[] nameValue = new String[2];
 nameValue[0] = Util.getStringFromBytes(name);
 nameValue[1] = Util.getStringFromBytes(value);
 return nameValue;
 }

 public byte[] getValue() {
 return value;
 }

 public String getValueAsString()  throws IOException {
 return Util.getStringFromBytes(value);
 }

 public long getValueAsLong() throws IOException {
 return Util.getLongFromBytes(value);
 }

 public void setValue(byte[] value) {
 this.value = value;
 }

 public void setValueFromString(String value) throws IOException {
 this.value = Util.getBytesFromString(value);
 }

 public void setValueFromLong(long value) throws IOException {
 this.value = Util.getBytesFromLong(value);
 }

}

This class essentially contains a column name and value. A list of these objects can be used for a operating on a set of columns. The SuperColumnValue class contains a  super column name and a list of ColumnValue instances as shows below. This class is used for operating on a set of super columns.


public class SuperColumnValue  extends BaseColumnValue{
 private List<ColumnValue> values;

 public void  write(String name, MapcolValues) throws Exception{
 this.name = name.getBytes(Util.ENCODING);
 values = new ArrayList<ColumnValue>();
 for (String colName : colValues.keySet()){
 ColumnValue colVal = new ColumnValue();
 colVal.write(name, colValues.get(colName));
 values.add(colVal);
 }
 }

 public Map<String, String> read() throws Exception{
 Map<String, String> columns = new HashMap<String, String>();
 for (ColumnValue colVal : values){
 String name = new String(colVal.getName(),Util.ENCODING);
 String value = new String(colVal.getValue(), Util.ENCODING);
 columns.put(name, value);
 }
 return columns;
 }

 /**
 * @return the values
 */
 public List<ColumnValue> getValues() {
 return values;
 }

 /**
 * @param values the values to set
 */
 public void setValues(List values) {
 this.values = values;
 }

}

Since a column and super column both contain name, it’s handled through a base BaseColumnValue class as below. Both ColumnValue and SuperColumnValue extend this class.

public class BaseColumnValue {
 protected byte[] name;

 /**
 * @return the name
 */
 public byte[] getName() {
 return name;
 }

 public String getNameAsString()  throws IOException{
 return Util.getStringFromBytes(name);
 }

 public long getNameAsLong() throws IOException {
 return Util.getLongFromBytes(name);
 }

 /**
 * @param name the name to set
 */
 public void setName(byte[] name) {
 this.name = name;
 }

 public void setNameFromString(String name) throws IOException {
 this.name = Util.getBytesFromString(name) ;
 }

 public void setNameFromLong(long name) throws IOException {
 this.name = Util.getBytesFromLong(name);
 }

}

I am only handling String and Long data types in these classes. But support for other data types can be easily added.

Misc stuff

I also have an Util class that contains all the serialization code and few helper methods

public class Util {
 public static final String ENCODING = "utf-8";

 public static byte[] getBytesFromString(String value) throws IOException{
 return value.getBytes(Util.ENCODING);
 }

 public static String getStringFromBytes(byte[] data) throws IOException{
 return new String(data, ENCODING);
 }

 public static byte[] getBytesFromLong(long value) throws IOException{
 ByteArrayOutputStream bos = new ByteArrayOutputStream();
 DataOutputStream dos = new DataOutputStream(bos);
 dos.writeLong(value);
 dos.flush();
 return bos.toByteArray();
 }

 public static Long getLongFromBytes(byte[] data) throws IOException{
 ByteArrayInputStream bis = new ByteArrayInputStream(data);
 DataInputStream dis = new DataInputStream(bis);
 Long value =  dis.readLong();
 return value;
 }

 public static  Map getColumns( List cList) throws Exception{
 Map<String, String> columns = new HashMap<String, String>();

 for (ColumnOrSuperColumn colSup : cList){
 Column col = colSup.getColumn();
 if (null != col){
 String name = getStringFromBytes(col.getName());
 String value = getStringFromBytes(col.getValue());
 columns.put(name, value);
 }
 }
 return columns;
 }

}

I have skipped the Conector class. It’s trivial. It creates a Cassandra client connection and caches it until a closeConnection() call is made. Every openConnection() call returns the same connection object. As I mentioned, this begs to be implemented with an object pool, with necessary pool management features.

Usage

As I mentioned earlier, I digressed into implementing this API while implementing a BPM. Here is an example of  creating a business process from my BPM application using this API.


public void createProcess(Process process, Map<String, String> context){
 try {
 List<SuperColumnValue> superColValues = new ArrayList<SuperColumnValue>();
 String rowKey = process.getProcessSchemaId() + ":" + process.getId();
 byte[] superCol =  Util.getBytesFromString("procState");
 List<ColumnValue> colValues = new ArrayList<ColumnValue>();
 ColumnValue colVal = new ColumnValue();
 colVal.write("state", "" + process.getState());
 colValues.add(colVal);
 colVal = new ColumnValue();
 colVal.write("event", "" + process.getEvent());
 colValues.add(colVal);
 colVal = new ColumnValue();
 colVal.write("prevState", "" + process.getPrevState());
 colValues.add(colVal);
 colVal = new ColumnValue();
 colVal.write("status", process.getStatus());
 colValues.add(colVal);
 colVal = new ColumnValue();
 colVal.write("transitionTime", "" + process.getTransitionTime());
 colValues.add(colVal);
 colVal = new ColumnValue();
 colVal.write("processSchemaId",  process.getProcessSchemaId());
 colValues.add(colVal);

 SuperColumnValue superColVal = new SuperColumnValue();
 superColVal.setName(superCol);
 superColVal.setValues(colValues);
 superColValues.add(superColVal);

 if (null != context){
 superCol = Util.getBytesFromString("procContext");
 colValues = new ArrayList<ColumnValue>();
 for (String key : context.keySet()){
 String value = context.get(key);
 colVal = new ColumnValue();
 colVal.write(key, context.get(key));
 colValues.add(colVal);
 }

 superColVal = new SuperColumnValue();
 superColVal.setName(superCol);
 superColVal.setValues(colValues);
 superColValues.add(superColVal);
 }
 DataManager.insertSuperColumns(KEYSPACE, COLUMN_FAMILY_PROCESS, rowKey,
 superColValues);
 } catch (Exception ex) {
 System.out.println("Failed to create process ");
 ex.printStackTrace();
 }
 }

I am inserting a super column here. I create a list of ColumnValue objects, one for each attribute of the Process object. Then I create a SuperColumnValue object and call insertSuperColumns() on the DataManager.

The Process object attributes gets saved in a column family Process and a super column procState. If any context gets passed, it gets saved under the super colum procContext.

Data serialization

When I store the different attributes of Process object as column values, I am just converting the basic data type to string and then to byte array. You could serialize the basic data type (e.g, the transitionTime which is a long) directly to byte array. Either way is fine, as long the serialization and deserialization logic follow the same approach.

One caveat to keep in mind is that if you have chosen column sort type other that default and you application logic relies on sorted columns, then column name should serialized and deserialized appropriately. For example, if LongType is chosen for some columns, then long value for the column name should be serialized directly using the Util class method getBytesFromLong(). Why would any one use long for column name? It could useful for any data that you want to keep sorted by some kind of sequence e.g., time series data.

Concluding thoughts

I hope you liked my post. I took a simple and minimalist approach. As I said before, it’s not complete by any means, but meets the needs for my project. Your comments and questions are most welcome.

I could think some really nice extension to the API, for example to have a reflection based API to transfer back and forth between any java bean object and a list of ColumnValue. It could even be based on annotations, so that only the object attributes to be mapped to Cassandra  columns could be specified and transient attributes could be skipped.

There are number of high level Cassandra API projects.The most popular and complete  Cassandra API is Hector. Kundera is JPA like annotation based Cassandra API.

Update
I have substantially improved the implementation. It supports Cassandra 7.x. I have also added connection pooling and load balancing.  The full source code is hosted on github.

About these ads

About Pranab

I am Pranab Ghosh, a software professional in the San Francisco Bay area. I manipulate bits and bytes for the good of living beings and the planet. I have worked with myriad of technologies and platforms in various business domains for early stage startups, large corporations and anything in between. I am an active blogger and open source contributor. I am passionate about technology and green and sustainable living. My technical interest areas are Big Data, Distributed Processing, NOSQL databases, Data Mining and Programming languages. I am fascinated by problems that don't have neat closed form solution.
This entry was posted in Cassandra, Java, NOSQL and tagged , . Bookmark the permalink.

2 Responses to Easy Cassandra Data Access

  1. Pingback: Cassandra Secondary Index Patterns | Mawazo

  2. Pingback: Cassandra Range Query Made Simple | Mawazo

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s