Bulk Insert, Update and Delete in Hadoop Data Lake


Hadoop Data Lake, unlike traditional data warehouse, does not enforce schema on write and serves as a repository of data with different formats from various sources. If the data collected in a data lake is immutable, they simply accumulate in an append only fashion and are easy to handle. Such data  tend to be fact data e.g., user behavior tracking data or sensor data.

However, dimension data or master data e.g., customer data, product data will typically be mutable. Generally they arrive in batches from some external source system reflecting incremental inserts, updates and deletes in the external system. All  incremental  changes need to be consolidated with the existing master data. This is a problem faced in many Hadoop or Hive based Big Data analytic project. Even if you are using the latest version of Hive, there is no bulk update or delete support. This post is about a Map Reduce job that will perform bulk insert, update and delete with data in HDFS. The implementation is part of the open source project chombo.

Hive HQL Does Not  Help

Even if you are using the latest version of Hive, you are not out of the woods. Latest version of Hive HQL has support for update and deletes. But that does not help. What we have from the external system are data  feeds containing the records that got inserted, updated and deleted.

It’s not feasible  to try to translate them to a set of corresponding HQL statements. Even if you could do that, Hive will be extremely inefficient because it will  run a map reduce job for each such HQL statement. I think we are getting the picture. It looks like a dead end.

As we will see later, you could use the Map Reduce job described in this post, whether you are using Hive or doing some custom  data science work directly in Hadoop.

As I mentioned earlier, immutable data i.e. insert only data does not pose any special problem. All you have to do is to simply drop such files in the Hadoop or Hive directory containing existing files.  No additional work is necessary.

Hive Input Output Format

In one of earlier posts, I had outlined a solution specifically for Hive with the aid of a custom Input Output Format. The key idea is to maintain multiple versions of a record, making use of Hive dynamic partitions.

A compactor run periodically would get rid of older versions of data i.e. older dynamic partitions. The latest version of data will be consolidated to a new dynamic partition.

Bulk Mutation Map Reduce

The implementation is in the class RecordSetBulkMutator. Let’ go through some key ideas on how it works. In the current master data in Hadoop we have the latest version of of all records for some entity e.g. product. In the incoming incremental  feed we may have one or more mutations for an existing record and new records. Every record has an unique ID.

Essentially, we join the the data sets i.e master and incremental data with the ID and do a secondary sorting on the temporal ordering field.

On the reducer side, for a given record key, we get all the mutation  in the value list. The last record in the list is the latest mutation i.e., latest version of a record, which is emitted if it’s an update and not emitted if it’s a delete.  For insert, there will be only one record, which will be emitted. Essentially it’s a merge and compact process.

For example, if the feed contained 2 updates for a record, the the values list will contain 3 records. The first one being record from the existing master data and the other two being the two updates from the incremental data feed. We simply emit the last record, because that is the latest version.

In the example above, if the last record corresponded to a delete, then we would not have emitted anything from the reducer.

Temporal Ordering

We assumed that each record contains field that represents the temporal ordering of the mutations. It could be a time stamp or a sequence. We are assuming that the source system has such a field and it’s being properly populated, when data is being updated or deleted.

What if the source system did not have such a field. There are few options, if that is the case. On the receipt of a data file on Hadoop Data Lake side, we could insert a temporal ordering field. One easy solution is to create epoch time stamp, assign it to the first record and assign incremented values to the rest of the records. This will work provided, the mutations were done in the source system in same order as they appear in the icremental data file.

Another option is to rename the file such that it ends with _ followed by a time stamp or a sequence. With this approach, we are assigning the same time stamp to all the records in the file. One down side of this approach  is that if there are multiple mutations on the same record, one of them will picked randomly on the reducer side, which may not be the latest.

Handling Deletes

We need a way to identify deletes. As we saw, on the reducer side, for a given record ID if the last record  has the mutation type delete, then we don’t emit the record. There are two options for identifying deletes. We could have a column in the data containing an op code. In the reducer, the op code could be used to identify whether the last record for given key corresponds to a delete.

An easier option is to prefix the data file containing all deletes  with a special string, that can be configured through a configuration parameter. This will work as long as all the deleted records are in a separate file.

Example

I have run the bulk mutation map reduce for some product data containing  the following  fields

  1. Product ID
  2. Category
  3. Brand
  4. Price
  5. Inventory
  6. Time stamp

I have chosen the option where the data has a temporal ordering field, which is the last field. I have one incremental data file containing a mixture of inserts and updates and another containing deletes only.

For handling deletes, I have chosen the option of prefixing the name of the files containing deletes only with a string that’s defined through a configuration parameter. Here are all the configuration parameters. The record ID can be a composite of multiple columns. In that case, you need to provide a coma separated list of column ordinals for id.field.ordinals.

#op.code.field.ordinal=4
deleted.op.code=D
deleted.record.file.prefix=del
temporal.ordering.field.ordinal=5
temporal.ordering.field.numeric=true
id.field.ordinals=0

Here are some snippets of the master data before the job ran and after the job ran showing a record being updated. You could see that the product with the ID VR8NCNE7DV has the inventory level modified in the second set of records, which is from the output of the map reduce job.

R217970F17,wearables,misfit,103,21,1427761080
6QD0BAVS7I,laptop,asus,502,54,1427764070
VR8NCNE7DV,wearables,fitbit,112,88,1427766458
3SDS30A11P,laptop,thinkpad,551,54,1427770906
.............................................
VOA31MCU9I,cell phone,apple,150,43,1427644188
VR8NCNE7DV,wearables,fitbit,112,82,1428415316
VRN5D60451,tablet,amazon kindle,258,96,1428527865
VSE72T0P4M,tablet,samsung,294,51,1426578803

A Simple Workflow

A simple workflow as described below could be used to merge incremental data with the base data using the map reduce job. It involves doing some HDFS file juggling and running the MR job.

  1. Base data in HDFS directory …/product/master
  2. Incremental data files as they arrive are dumped into …/product/incr
  3. Move current set of incremental data files to …/product/working
  4. Run MR job taking input from …/product/master and …/product/working
  5. Output of MR job goes …/product/output
  6. If temporal ordering is being done, using file name suffix, then an _ and epoch  time stamp or sequence should be appended to each output file name
  7. Move current base data files from …/product/master to …/product/backup
  8. Move new base data files from …/product/output to …/product/master

Step 3 is necessary because we want to run the MR job on a current snapshot of incremental data files. It prevents the MR job from being affected by newly arriving incremental data files, while the job is running.

Step 7 and 8 should execute quickly since they involve only file move. So that the impact on any MR job or Hive query will be minimum. Ideally, steps 7 and 8 should be performed as an atomic operation.

With Hive, instead of ../product/master, Hive external table data directory or the warehouse data directory should be used.

Hive Partition

If you are using Hive with the data partitioned, you need to partition the incremental data file the same way and create multiple partitioned incremental data files. Then you need to run the MR job multiple times, once for each partition.

Each run will take as input the base data for a partition and the corresponding  incremental data for the same partition. Essentially, you have to repeat a work flow very similar to the one described above  for each Hive partition.

Since the incremental data files are not likely be too big, the easiest thing might be to write a python script to partition the data.

Scaling with Partitioning

As the merged master data volume grows, scaling may become an issue. Master data should become up to date as quickly as possible after the arrival of the incremental data. Intelligent partitioning of the data could make the make the merge and compact process more scalaeble. Event if you are not using Hive,  you could still partition your HDFS master data  files.

Partitioning should be done in a way to minimize the number of partitions affected by the merge and compact process. For example, we could partition by create data, based on the intuition that mutations tend to happen more with the recent data. If this intuition turns out to be true, only a small fraction of partitions of the master will be involved in the merge and compact process.

What About Other Hive Storage Formats

The bulk mutation map reduce uses field delimited flat text data as input and produces field delimited flat text data as output. What if the Hive tables are in ORC , Parquet  or some other storage format? You could follow these steps to handle other storage formats.

  1. Export hive table data to a separate table with delimited text  storage format using insert overwrite
  2. Run bulk mutation map reduce as outlined earlier, using the output of step 1. The output of this will delimited  text files.
  3. Treating the output of map reduce  step 2 as Hive table with delimited text storage format, run insert overwrite to create Hive tables of desired storage format.

If the bulk mutation map reduce is the only way, data is being merged, then step 1 needs to be performed only once.

For subsequent executions, instead of running step 1 , you could take the output of step 2 from previous run and use it in step 2 of the current run. Essentially, you are maintaining a parallel Hive table with delimited text storage format.

Summing Up

Here is a tutorial document, in case you want to try it out. The solution pattern used here is widely used by many NOSQL databases. Multiple versions of a document or column of a row resulting from mutation are maintained. During query, the database return the latest version of data. A compactor runs periodically to remove the older versions of data.

About Pranab

I am Pranab Ghosh, a software professional in the San Francisco Bay area. I manipulate bits and bytes for the good of living beings and the planet. I have worked with myriad of technologies and platforms in various business domains for early stage startups, large corporations and anything in between. I am an active blogger and open source project owner. I am passionate about technology and green and sustainable living. My technical interest areas are Big Data, Distributed Processing, NOSQL databases, Machine Learning and Programming languages. I am fascinated by problems that don't have neat closed form solution.
This entry was posted in Big Data, ETL, Hadoop and Map Reduce, Hive and tagged , , , . Bookmark the permalink.

19 Responses to Bulk Insert, Update and Delete in Hadoop Data Lake

  1. Pingback: Bulk Insert, Update and Delete in Hadoop Data Lake | Mawazo | Big Data Cloud

  2. Mitra says:

    I would like to capture the historical data as well. i.e implement a slowly changing dimension(SCD) type 1 as few of the hive tables. How should i proceed in such a case?

    Thank you,
    Mitra

  3. ravi muthyala says:

    This is nice explanation ..even I would like to know more about capturing the histprical data using hive and hdfs files

  4. Anandh kumar says:

    hive doesn’t support DML queries,
    if create a ACID property to do this,

    set this properties

    SET hive.support.concurrency=true;
    SET hive.enforce.bucketing=true;
    SET hive.exec.dynamic.partition.mode=nonstrict;
    SET hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
    SET hive.compactor.initiator.on=true;
    SET hive.compactor.worker.threads=1;
    SET hive.support.concurrency=true;
    SET hive.enforce.bucketing=true;

    then create a hive table using bucketing and stored as ORC file format,
    then do the update and delete queries in hive,
    for more information on hive DML go this blog for step by step process
    http://www.geoinsyssoft.com/hive-dml-update-delete-operations/

  5. Pranab says:

    @Anandh
    As I explained in this post, there are scenarios, where you can not event define the update HQL query you need.
    Consider a batch of of 5000 records, out of which 1000 are existing i.e. they require update. First you have to find the id’s that already exist and then you have to formulate “where in ()” clause for the update query. AFAIK, Hive does not support “in” in where clause. You can execute multiple “where id = …” queries 1000 times. it’s just not practical.

  6. Baswaraj says:

    @Pranab,
    I have unique use case:
    We trying to build data Lake using HDFS.
    – 200+ baseTables with few billions of records/ few TBs of data.
    – Support incremental insert/Update with SLA of less than 30 Minutes.
    I feel this is impossible to achieve using HDFS or (hive for that matter).
    I tried small POC using few tables, but didn’t get updates available within given SLA.

    I also tried POC using key-value store( e.g. Couchabse) as Data Lake and was able to get updates within given SLA.
    But we will be more happy if we get this SLA with HDFS as Data Lake.
    Do you have any other suggestions/techniques where HDFS can be used to achieve this SLA for the given size of data ?

    • Pranab says:

      Did you try the technique as outlined in this post? Only thing I can think of is tuning HDFS for better performance.
      You can try increasing cluster size. BTW latency will increase as the existing data size grows, because incremental update involves joining with existing data.
      Current implementation does reduce side join and shuffle. If the size of incremental batch is reasonable, you could try map side join

      • Baswaraj says:

        Thanks for your reply!
        We tried only using HDFS. No hive at all.
        We iterate over existing files to locate update and re-write existing data with updates to new file. Disk I/O is limitation for scaling as we need to scan and re-write the file.
        I did try partitioning on HDFS, but that didn’t help much when incremental updates are for all partitions !

  7. sudarshan says:

    Hello Pranab,
    Great explanation .I have same use case but only difference is that i have incremental records in order,that is for one key there might be several updates and all updates has to be in sequence because one update will have info about coming updates .
    How can i achieve that ?
    Every time do i have to comparing each update records with the main files or records ?

    Regards
    Sudarshan

    • Pranab says:

      As new update arrives you run the MR using the current update file and the output from the last run of the MR job. You run the MR job once every time new updates arrive in the same sequence.

      • sudarshan says:

        Yes Pranab but the problem is i have 5k files and update arrives 10 xml files per minutes .So what do you think about that ?

      • Pranab says:

        Sudarshan. You can write a script that will order the files by time stamp and run the MR job taking the incremental file as input in that same order. But your cluster should be capable of running and completing the MR job 10 times per minute.
        Another option is to concatenate few incremental file following the time order and running the MR job on the concatenated file. Processing will be faster
        A third option will be for me to modify the MR job so that it can handle multiple incremental files. I will look into that and update you

  8. sudarshan says:

    Thanks Pranab i will look into details of what you suggested …Will wait for your reply …

    • Pranab says:

      Sudarshan, I checked the implementation. It will work for multiple incremental files, as long the sequence number appended to the names of files are in the right order, if you are choosing the fine name based option. For a given record, the record from file with the highest sequence number as suffix (i.e latest update) will prevail.

  9. Why not admit for EDW SCD Type 1 , Type 2 etc. will not work in hive . for large org there will be so many fact and dimension tables which will have data in excess of 100 GBs and and some fact tables with TB of data . none of the solutions will work. Fact tables can be also type 2 .

  10. BlueThinker says:

    Is it a good solution to use a Document DB (CosmosDB, MongoDB or another) to capture the updates and then use them in Hive together with transnational data?

Leave a comment