Making Hive Squawk like a Real Database


Hive is great for large scale data warehousing applications. In one of my recent projects I was handed over the interesting and challenging task of  making Hive behave like an OLTP system i.e., support update and delete. To be more specific, the Hive database needed to be kept in near real time synchronization with multiple OLTP systems. In this post, I will discuss  the high level features of the  solution based on Hive custom input output format.

The Scenario

Imagine a national retail chain collecting sales data in the regional OLTP databases.  Periodically, the data from multiple OLTP systems gets fed into Hive, which aggregates all the data. The data arrival is frequent enough so that Hive is in near real time synchronization with the source OLTP systems.

Essentially, every time a record gets inserted, updated or deleted in the source OLTP, it gets shipped to Hive.  This process is also known as transactional replication. Here are the main  issues I faced to meet the requirements and for a satisfactory solution.

  • Since Hive can not be updated once data is loaded, how do we handle new inserts or updates on existing record.
  • While writing, how do we decide whether a new incoming record is an insert or update and how to handle it in Hive. Hive does not have any notion of update or deletes.
  • For a query, how do I ensure that only the latest version of a record is returned.

Database Schema

Here is the Hive schema for tables related to sales data. The incoming batch of data is loaded into the following staging table using Hive load statement.

CREATE EXTERNAL TABLE test.sell_stage
(
sell_id BIGINT, 
acct_id BIGINT, 
change_ts STRING, 
create_ts STRING, 
payment_type_id STRING, 
status_id STRING, 
total_amt DOUBLE, 
batch INT, 
op_code STRING,
store_id STRING, 
create_date STRING
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION  '/user/pranab/db/stage/test/sell_stg';

The main table holding the sells data is as follows

CREATE EXTERNAL TABLE test.sell
(
sell_id BIGINT, 
acct_id BIGINT, 
change_ts STRING, 
create_ts STRING, 
payment_type_id STRING, 
status_id STRING, 
total_amt DOUBLE , 
batch INT, 
op_code STRING
)
PARTITIONED BY (store_id STRING, create_date STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe'
STORED AS RCFILE
LOCATION '/user/pranab/db/data/test/sell';

This table is partitioned by store_id and create_date. Partitioning in Hive is like indexes in regular database. This table stores data in RCFile. RCFile is a columnar storage in HDFS.

A Naive Approach

If we execute Hive insert overwrite query, with the new data, it will simply replace the existing data in sell table with the new data.

But we want to add the new data without disturbing the existing data in the sell table. We could take the union of existing data in sell and the new data from sell_stg and then insert overwrite into the sell table.

The query will look something like this

INSERT OVERWRITE TABLE test.sell PARTITION(store_id, create_date)
SELECT a.*
FROM (
select ss.* from test.sell_stage ss
union all
select sd.*  from test.sell sd
) a;

This works, but we still have a big problem.

The data in the  sell table keeps growing and the idea of taking all the existing data and rewriting them back into the table does not sound very smart and will not scale.

A Better Way

The solution required the following changes

  1. Adding another partitions called version_id. There is a new version_id corresponding to one or more batches. As we will see shortly, this helps to add new data non intrusively  without touching existing data
  2. Add new table properties designating primary key columns, the op code column and the version id column. Primary key column definitions are used to identify a record among exiting data.
  3. Add custom input output format and associated custom writer and reader. For an incoming record, the writer is responsible for to finding out if it’s a new record or update of an existing record.  It’s also responsible for tracking the latest version of a record. The reader is responsible for returning the latest  version of a record.

The schema changes described above are summarized below

PARTITIONED BY (store_id STRING, create_date STRING , version_id INT)
SORTED BY (sell_id) 
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe'
STORED AS INPUTFORMAT 'com.hadoop.input.act.like.database.RCFileInputFormat'
OUTPUTFORMAT 'com.hadoop.input.act.like.database.RCFileOutputFormat'
LOCATION '/user/pranab/db/data/test/sell'
TBLPROPERTIES ('op_code_column' = 'op_code', 'version_id_column' = 'version_id', 'primary_key_columns' = 'sell_id');

New Partition

We want to write the new data in a new partition, so that existing data on the sell table can be left untouched.

We introduce another partition called version_id. There is a unique version_id for one or more batches of data that we add to the sell table.

The partitions for the sell table are now as follows

PARTITIONED BY (store_id STRING, create_date STRING, version_id INT)

With this new partition in place, our insert overwrite query will look like this

INSERT OVERWRITE TABLE test.sell PARTITION(store_id, create_date, version_id) 
SELECT * FROM test.sell_stage_view

We are using a view called sell_stage_view which is essentially the table sell_stage with an extra column at the end which is the version_id. This will result in new data going into a new version_id partition under store_id, create_date partitions

Writer

The writer gets invoked when an Hive insert overwrite query is executed. The custom writer logic for processing a record  is as follows.

  1. Go through all the older version_id partitions under the  store_id, create_date partition and check if a record with the same primary key exists. If it does it’s an update, otherwise it’s an insert
  2. If it’s an update, write the tuple (primary key, version id) into a file, in the current directory, under which the the current  data file resides. We will call this file version index file, which could be a sequence file or a map file.
  3. Write the record, unless it’s case of delete. Assuming that a deleted record arrives as any other record with the opcode set to “delete”, we simply update the version index file.

Reader

The reader gets invoked when a select query is performed. The steps for our custom reader are as follows

  1. The input format class reads all the version index files with a version_id more recent than the version_id of the the current data file. The data from all the version index files is loaded into an in memory map is such a way that only the latest version_id of a given record is stored in the map
  2. The reader while processing a record checks the version index map to find out if a later version of the same record exists  in a later version_id partition. If so, the record is not returned and it processes the next record read from the data file.
  3. Otherwise, it returns the record

Compaction

As you can imagine, as data accumulates, additional version ID partitions and version index files get created, which will impact performance. Compaction is particularly important for update heavy data.

The solution is to do an insert overwrite query where we read all the existing version ID partitions, and write them to a new version ID partition. Then we drop the old version ID partitions.

Following compaction, only the latest version of a record is retained. Older versions are purged.

Final Words

In this post, I have discussed only the high level artifacts of the architecture. There are lot of details involved. For example, I used Bloom Filter to check the existence of a record with the same primary key. I also used Hive’s bucket feature to make the reader and writer more scalable.

For commercial support for any solution in my github repositories, please talk to ThirdEye Data Science Services. Support is available for Hadoop or Spark deployment on cloud including installation, configuration and testing,

About Pranab

I am Pranab Ghosh, a software professional in the San Francisco Bay area. I manipulate bits and bytes for the good of living beings and the planet. I have worked with myriad of technologies and platforms in various business domains for early stage startups, large corporations and anything in between. I am an active blogger and open source project owner. I am passionate about technology and green and sustainable living. My technical interest areas are Big Data, Distributed Processing, NOSQL databases, Machine Learning and Programming languages. I am fascinated by problems that don't have neat closed form solution.
This entry was posted in Big Data, Data Warehouse, Hive and tagged , . Bookmark the permalink.

16 Responses to Making Hive Squawk like a Real Database

  1. SM says:

    I guess (as per my knowledge 🙂 ) we can implement the same feature with HBase as external table to Hive.

  2. Navis says:

    I remember hive supports “INSERT INTO TABLE” which does not overwrite existing data.

    • Pranab says:

      “INSERT INTO” is available from Hive version 0.80 only. The solution discussed in this post is based on Hive version 0.70. It is possible to come up with a similar solution based on “INSERT INTO” and without the version_id partitions.

      • rick says:

        We can use “insert overwrite ” with “case” to implement update.
        For example, sql “update user set uname=’uname1′ where uid=1” can use “INSERT OVERWRITE TABLE user SELECT uid, CASE WHEN uid=1 THEN ‘name1’ ELSE uname END AS UNAME from user”. But it’s very slow. How about your method with version_id? Does it run fast?

  3. Pingback: Making Hive Squawk like a Real Database | BigDataCloud.com

  4. rick says:

    hi, Pranab
    I’m a newbie of Hive and very interested in how to implement update/delete on Hive . Could you tell me more information about 2 questions as below.
    1. How did you check if a record with the same primary key exists? Did you maintain other data files to look up the info efficiently? It seems difficult if the row number is large.
    2. Is version partition opaque to the user?
    Thanks a lot.
    Rick

    • Pranab says:

      Hi Rick
      1. Older data is kept in older version partitions. They are looked up during the insert overwrite with newer data to decide whether it’s an insert or update. Bloom filters can be used to optimize the the process
      2. Yes, version partitions are completely opaque to the end user

      • rick says:

        Hi, Pranab
        when did you update the bloom filter ? How long does it take if you build it at “load data” ?

  5. Sasidhar says:

    Hi Pranab,
    Thanks for the post, its intersting.
    Could you pl clarify me on what value is passing to Version_ID field in view? Version_id filed in view is just created without populating any value?

    • Pranab says:

      Sasidhar
      You pass version ID in the insert overwrite query. You need some kind of sequence generator. One option is to use RDBMS to generate sequence and use that as the version ID

  6. Sriram says:

    Hi Pranab
    Great blog and idea. Wouldn’t it be too costly to maintain so many partitions as they are based on sell_id, create_dt and version id?

    • Pranab says:

      Sriram
      The section under compaction answers your question. Once a bunch of partitions have been compacted into one big partition, the old partitions can be deleted. This is very similar to what many NOSQL databases do

  7. Ram says:

    Can you please provide example code for custom write and reader

    • Pranab says:

      Ram, Unfortunately the code is not open source and hence can not be shared. If SQL is important to you, I would recommend exploring newSQL databases like Impala or Presto, instead of hacking Hive.

  8. Pingback: Bulk Insert, Update and Delete in Hadoop Data Lake | Mawazo

Leave a comment