Hive is great for large scale data warehousing applications. In one of my recent projects I was handed over the interesting and challenging task of making Hive behave like an OLTP system i.e., support update and delete. To be more specific, the Hive database needed to be kept in near real time synchronization with multiple OLTP systems. In this post, I will discuss the high level features of the solution based on Hive custom input output format.
The Scenario
Imagine a national retail chain collecting sales data in the regional OLTP databases. Periodically, the data from multiple OLTP systems gets fed into Hive, which aggregates all the data. The data arrival is frequent enough so that Hive is in near real time synchronization with the source OLTP systems.
Essentially, every time a record gets inserted, updated or deleted in the source OLTP, it gets shipped to Hive. This process is also known as transactional replication. Here are the main issues I faced to meet the requirements and for a satisfactory solution.
- Since Hive can not be updated once data is loaded, how do we handle new inserts or updates on existing record.
- While writing, how do we decide whether a new incoming record is an insert or update and how to handle it in Hive. Hive does not have any notion of update or deletes.
- For a query, how do I ensure that only the latest version of a record is returned.
Database Schema
Here is the Hive schema for tables related to sales data. The incoming batch of data is loaded into the following staging table using Hive load statement.
CREATE EXTERNAL TABLE test.sell_stage ( sell_id BIGINT, acct_id BIGINT, change_ts STRING, create_ts STRING, payment_type_id STRING, status_id STRING, total_amt DOUBLE, batch INT, op_code STRING, store_id STRING, create_date STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE LOCATION '/user/pranab/db/stage/test/sell_stg';
The main table holding the sells data is as follows
CREATE EXTERNAL TABLE test.sell ( sell_id BIGINT, acct_id BIGINT, change_ts STRING, create_ts STRING, payment_type_id STRING, status_id STRING, total_amt DOUBLE , batch INT, op_code STRING ) PARTITIONED BY (store_id STRING, create_date STRING) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe' STORED AS RCFILE LOCATION '/user/pranab/db/data/test/sell';
This table is partitioned by store_id and create_date. Partitioning in Hive is like indexes in regular database. This table stores data in RCFile. RCFile is a columnar storage in HDFS.
A Naive Approach
If we execute Hive insert overwrite query, with the new data, it will simply replace the existing data in sell table with the new data.
But we want to add the new data without disturbing the existing data in the sell table. We could take the union of existing data in sell and the new data from sell_stg and then insert overwrite into the sell table.
The query will look something like this
INSERT OVERWRITE TABLE test.sell PARTITION(store_id, create_date) SELECT a.* FROM ( select ss.* from test.sell_stage ss union all select sd.* from test.sell sd ) a;
This works, but we still have a big problem.
The data in the sell table keeps growing and the idea of taking all the existing data and rewriting them back into the table does not sound very smart and will not scale.
A Better Way
The solution required the following changes
- Adding another partitions called version_id. There is a new version_id corresponding to one or more batches. As we will see shortly, this helps to add new data non intrusively without touching existing data
- Add new table properties designating primary key columns, the op code column and the version id column. Primary key column definitions are used to identify a record among exiting data.
- Add custom input output format and associated custom writer and reader. For an incoming record, the writer is responsible for to finding out if it’s a new record or update of an existing record. It’s also responsible for tracking the latest version of a record. The reader is responsible for returning the latest version of a record.
The schema changes described above are summarized below
PARTITIONED BY (store_id STRING, create_date STRING , version_id INT) SORTED BY (sell_id) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe' STORED AS INPUTFORMAT 'com.hadoop.input.act.like.database.RCFileInputFormat' OUTPUTFORMAT 'com.hadoop.input.act.like.database.RCFileOutputFormat' LOCATION '/user/pranab/db/data/test/sell' TBLPROPERTIES ('op_code_column' = 'op_code', 'version_id_column' = 'version_id', 'primary_key_columns' = 'sell_id');
New Partition
We want to write the new data in a new partition, so that existing data on the sell table can be left untouched.
We introduce another partition called version_id. There is a unique version_id for one or more batches of data that we add to the sell table.
The partitions for the sell table are now as follows
PARTITIONED BY (store_id STRING, create_date STRING, version_id INT)
With this new partition in place, our insert overwrite query will look like this
INSERT OVERWRITE TABLE test.sell PARTITION(store_id, create_date, version_id) SELECT * FROM test.sell_stage_view
We are using a view called sell_stage_view which is essentially the table sell_stage with an extra column at the end which is the version_id. This will result in new data going into a new version_id partition under store_id, create_date partitions
Writer
The writer gets invoked when an Hive insert overwrite query is executed. The custom writer logic for processing a record is as follows.
- Go through all the older version_id partitions under the store_id, create_date partition and check if a record with the same primary key exists. If it does it’s an update, otherwise it’s an insert
- If it’s an update, write the tuple (primary key, version id) into a file, in the current directory, under which the the current data file resides. We will call this file version index file, which could be a sequence file or a map file.
- Write the record, unless it’s case of delete. Assuming that a deleted record arrives as any other record with the opcode set to “delete”, we simply update the version index file.
Reader
The reader gets invoked when a select query is performed. The steps for our custom reader are as follows
- The input format class reads all the version index files with a version_id more recent than the version_id of the the current data file. The data from all the version index files is loaded into an in memory map is such a way that only the latest version_id of a given record is stored in the map
- The reader while processing a record checks the version index map to find out if a later version of the same record exists in a later version_id partition. If so, the record is not returned and it processes the next record read from the data file.
- Otherwise, it returns the record
Compaction
As you can imagine, as data accumulates, additional version ID partitions and version index files get created, which will impact performance. Compaction is particularly important for update heavy data.
The solution is to do an insert overwrite query where we read all the existing version ID partitions, and write them to a new version ID partition. Then we drop the old version ID partitions.
Following compaction, only the latest version of a record is retained. Older versions are purged.
Final Words
In this post, I have discussed only the high level artifacts of the architecture. There are lot of details involved. For example, I used Bloom Filter to check the existence of a record with the same primary key. I also used Hive’s bucket feature to make the reader and writer more scalable.
For commercial support for any solution in my github repositories, please talk to ThirdEye Data Science Services. Support is available for Hadoop or Spark deployment on cloud including installation, configuration and testing,
I guess (as per my knowledge 🙂 ) we can implement the same feature with HBase as external table to Hive.
Technically yes. However, plugging in a custom IO Format in Hive is lot simpler than having to drag in the whole HBase footprint.
I remember hive supports “INSERT INTO TABLE” which does not overwrite existing data.
“INSERT INTO” is available from Hive version 0.80 only. The solution discussed in this post is based on Hive version 0.70. It is possible to come up with a similar solution based on “INSERT INTO” and without the version_id partitions.
We can use “insert overwrite ” with “case” to implement update.
For example, sql “update user set uname=’uname1′ where uid=1” can use “INSERT OVERWRITE TABLE user SELECT uid, CASE WHEN uid=1 THEN ‘name1’ ELSE uname END AS UNAME from user”. But it’s very slow. How about your method with version_id? Does it run fast?
Pingback: Making Hive Squawk like a Real Database | BigDataCloud.com
hi, Pranab
I’m a newbie of Hive and very interested in how to implement update/delete on Hive . Could you tell me more information about 2 questions as below.
1. How did you check if a record with the same primary key exists? Did you maintain other data files to look up the info efficiently? It seems difficult if the row number is large.
2. Is version partition opaque to the user?
Thanks a lot.
Rick
Hi Rick
1. Older data is kept in older version partitions. They are looked up during the insert overwrite with newer data to decide whether it’s an insert or update. Bloom filters can be used to optimize the the process
2. Yes, version partitions are completely opaque to the end user
Hi, Pranab
when did you update the bloom filter ? How long does it take if you build it at “load data” ?
Hi Pranab,
Thanks for the post, its intersting.
Could you pl clarify me on what value is passing to Version_ID field in view? Version_id filed in view is just created without populating any value?
Sasidhar
You pass version ID in the insert overwrite query. You need some kind of sequence generator. One option is to use RDBMS to generate sequence and use that as the version ID
Hi Pranab
Great blog and idea. Wouldn’t it be too costly to maintain so many partitions as they are based on sell_id, create_dt and version id?
Sriram
The section under compaction answers your question. Once a bunch of partitions have been compacted into one big partition, the old partitions can be deleted. This is very similar to what many NOSQL databases do
Can you please provide example code for custom write and reader
Ram, Unfortunately the code is not open source and hence can not be shared. If SQL is important to you, I would recommend exploring newSQL databases like Impala or Presto, instead of hacking Hive.
Pingback: Bulk Insert, Update and Delete in Hadoop Data Lake | Mawazo