Hive is great for large scale data warehousing applications. In one of my recent projects I was handed over the interesting and challenging task of making Hive behave like an OLTP system i.e., support update and delete. To be more specific, the Hive database needed to be kept in near real time synchronization with multiple OLTP systems. In this post, I will discuss the high level features of the solution based on Hive custom input output format.
Imagine a national retail chain collecting sales data in the regional OLTP databases. Periodically, the data from multiple OLTP systems gets fed into Hive, which aggregates all the data. The data arrival is frequent enough so that Hive is in near real time synchronization with the source OLTP systems.
Essentially, every time a record gets inserted, updated or deleted in the source OLTP, it gets shipped to Hive. This process is also known as transactional replication. Here are the main issues I faced to meet the requirements and for a satisfactory solution.
- Since Hive can not be updated once data is loaded, how do we handle new inserts or updates on existing record.
- While writing, how do we decide whether a new incoming record is an insert or update and how to handle it in Hive. Hive does not have any notion of update or deletes.
- For a query, how do I ensure that only the latest version of a record is returned.
Here is the Hive schema for tables related to sales data. The incoming batch of data is loaded into the following staging table using Hive load statement.
CREATE EXTERNAL TABLE test.sell_stage ( sell_id BIGINT, acct_id BIGINT, change_ts STRING, create_ts STRING, payment_type_id STRING, status_id STRING, total_amt DOUBLE, batch INT, op_code STRING, store_id STRING, create_date STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE LOCATION '/user/pranab/db/stage/test/sell_stg';
The main table holding the sells data is as follows
CREATE EXTERNAL TABLE test.sell ( sell_id BIGINT, acct_id BIGINT, change_ts STRING, create_ts STRING, payment_type_id STRING, status_id STRING, total_amt DOUBLE , batch INT, op_code STRING ) PARTITIONED BY (store_id STRING, create_date STRING) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe' STORED AS RCFILE LOCATION '/user/pranab/db/data/test/sell';
This table is partitioned by store_id and create_date. Partitioning in Hive is like indexes in regular database. This table stores data in RCFile. RCFile is a columnar storage in HDFS.
A Naive Approach
If we execute Hive insert overwrite query, with the new data, it will simply replace the existing data in sell table with the new data.
The query will look something like this
INSERT OVERWRITE TABLE test.sell PARTITION(store_id, create_date) SELECT a.* FROM ( select ss.* from test.sell_stage ss union all select sd.* from test.sell sd ) a;
This works, but we still have a big problem.
A Better Way
The solution required the following changes
- Adding another partitions called version_id. There is a new version_id corresponding to one or more batches. As we will see shortly, this helps to add new data non intrusively without touching existing data
- Add new table properties designating primary key columns, the op code column and the version id column. Primary key column definitions are used to identify a record among exiting data.
- Add custom input output format and associated custom writer and reader. For an incoming record, the writer is responsible for to finding out if it’s a new record or update of an existing record. It’s also responsible for tracking the latest version of a record. The reader is responsible for returning the latest version of a record.
The schema changes described above are summarized below
PARTITIONED BY (store_id STRING, create_date STRING , version_id INT) SORTED BY (sell_id) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe' STORED AS INPUTFORMAT 'com.hadoop.input.act.like.database.RCFileInputFormat' OUTPUTFORMAT 'com.hadoop.input.act.like.database.RCFileOutputFormat' LOCATION '/user/pranab/db/data/test/sell' TBLPROPERTIES ('op_code_column' = 'op_code', 'version_id_column' = 'version_id', 'primary_key_columns' = 'sell_id');
We want to write the new data in a new partition, so that existing data on the sell table can be left untouched.
The partitions for the sell table are now as follows
PARTITIONED BY (store_id STRING, create_date STRING, version_id INT)
With this new partition in place, our insert overwrite query will look like this
INSERT OVERWRITE TABLE test.sell PARTITION(store_id, create_date, version_id) SELECT * FROM test.sell_stage_view
We are using a view called sell_stage_view which is essentially the table sell_stage with an extra column at the end which is the version_id. This will result in new data going into a new version_id partition under store_id, create_date partitions
The writer gets invoked when an Hive insert overwrite query is executed. The custom writer logic for processing a record is as follows.
- Go through all the older version_id partitions under the store_id, create_date partition and check if a record with the same primary key exists. If it does it’s an update, otherwise it’s an insert
- If it’s an update, write the tuple (primary key, version id) into a file, in the current directory, under which the the current data file resides. We will call this file version index file, which could be a sequence file or a map file.
- Write the record, unless it’s case of delete. Assuming that a deleted record arrives as any other record with the opcode set to “delete”, we simply update the version index file.
The reader gets invoked when a select query is performed. The steps for our custom reader are as follows
- The input format class reads all the version index files with a version_id more recent than the version_id of the the current data file. The data from all the version index files is loaded into an in memory map is such a way that only the latest version_id of a given record is stored in the map
- The reader while processing a record checks the version index map to find out if a later version of the same record exists in a later version_id partition. If so, the record is not returned and it processes the next record read from the data file.
- Otherwise, it returns the record
As you can imagine, as data accumulates, additional version ID partitions and version index files get created, which will impact performance. Compaction is particularly important for update heavy data.
Following compaction, only the latest version of a record is retained. Older versions are purged.
In this post, I have discussed only the high level artifacts of the architecture. There are lot of details involved. For example, I used Bloom Filter to check the existence of a record with the same primary key. I also used Hive’s bucket feature to make the reader and writer more scalable.