Hadoop Data Lake, unlike traditional data warehouse, does not enforce schema on write and serves as a repository of data with different formats from various sources. If the data collected in a data lake is immutable, they simply accumulate in an append only fashion and are easy to handle. Such data tend to be fact data e.g., user behavior tracking data or sensor data.
However, dimension data or master data e.g., customer data, product data will typically be mutable. Generally they arrive in batches from some external source system reflecting incremental inserts, updates and deletes in the external system. All incremental changes need to be consolidated with the existing master data. This is a problem faced in many Hadoop or Hive based Big Data analytic project. Even if you are using the latest version of Hive, there is no bulk update or delete support. This post is about a Map Reduce job that will perform bulk insert, update and delete with data in HDFS. The implementation is part of the open source project chombo.
Hive HQL Does Not Help
Even if you are using the latest version of Hive, you are not out of the woods. Latest version of Hive HQL has support for update and deletes. But that does not help. What we have from the external system are data feeds containing the records that got inserted, updated and deleted.
It’s not feasible to try to translate them to a set of corresponding HQL statements. Even if you could do that, Hive will be extremely inefficient because it will run a map reduce job for each such HQL statement. I think we are getting the picture. It looks like a dead end.
As we will see later, you could use the Map Reduce job described in this post, whether you are using Hive or doing some custom data science work directly in Hadoop.
As I mentioned earlier, immutable data i.e. insert only data does not pose any special problem. All you have to do is to simply drop such files in the Hadoop or Hive directory containing existing files. No additional work is necessary.
Hive Input Output Format
In one of earlier posts, I had outlined a solution specifically for Hive with the aid of a custom Input Output Format. The key idea is to maintain multiple versions of a record, making use of Hive dynamic partitions.
A compactor run periodically would get rid of older versions of data i.e. older dynamic partitions. The latest version of data will be consolidated to a new dynamic partition.
Bulk Mutation Map Reduce
The implementation is in the class RecordSetBulkMutator. Let’ go through some key ideas on how it works. In the current master data in Hadoop we have the latest version of of all records for some entity e.g. product. In the incoming incremental feed we may have one or more mutations for an existing record and new records. Every record has an unique ID.
Essentially, we join the the data sets i.e master and incremental data with the ID and do a secondary sorting on the temporal ordering field.
On the reducer side, for a given record key, we get all the mutation in the value list. The last record in the list is the latest mutation i.e., latest version of a record, which is emitted if it’s an update and not emitted if it’s a delete. For insert, there will be only one record, which will be emitted. Essentially it’s a merge and compact process.
For example, if the feed contained 2 updates for a record, the the values list will contain 3 records. The first one being record from the existing master data and the other two being the two updates from the incremental data feed. We simply emit the last record, because that is the latest version.
In the example above, if the last record corresponded to a delete, then we would not have emitted anything from the reducer.
Temporal Ordering
We assumed that each record contains field that represents the temporal ordering of the mutations. It could be a time stamp or a sequence. We are assuming that the source system has such a field and it’s being properly populated, when data is being updated or deleted.
What if the source system did not have such a field. There are few options, if that is the case. On the receipt of a data file on Hadoop Data Lake side, we could insert a temporal ordering field. One easy solution is to create epoch time stamp, assign it to the first record and assign incremented values to the rest of the records. This will work provided, the mutations were done in the source system in same order as they appear in the icremental data file.
Another option is to rename the file such that it ends with _ followed by a time stamp or a sequence. With this approach, we are assigning the same time stamp to all the records in the file. One down side of this approach is that if there are multiple mutations on the same record, one of them will picked randomly on the reducer side, which may not be the latest.
Handling Deletes
We need a way to identify deletes. As we saw, on the reducer side, for a given record ID if the last record has the mutation type delete, then we don’t emit the record. There are two options for identifying deletes. We could have a column in the data containing an op code. In the reducer, the op code could be used to identify whether the last record for given key corresponds to a delete.
An easier option is to prefix the data file containing all deletes with a special string, that can be configured through a configuration parameter. This will work as long as all the deleted records are in a separate file.
Example
I have run the bulk mutation map reduce for some product data containing the following fields
- Product ID
- Category
- Brand
- Price
- Inventory
- Time stamp
I have chosen the option where the data has a temporal ordering field, which is the last field. I have one incremental data file containing a mixture of inserts and updates and another containing deletes only.
For handling deletes, I have chosen the option of prefixing the name of the files containing deletes only with a string that’s defined through a configuration parameter. Here are all the configuration parameters. The record ID can be a composite of multiple columns. In that case, you need to provide a coma separated list of column ordinals for id.field.ordinals.
#op.code.field.ordinal=4 deleted.op.code=D deleted.record.file.prefix=del temporal.ordering.field.ordinal=5 temporal.ordering.field.numeric=true id.field.ordinals=0
Here are some snippets of the master data before the job ran and after the job ran showing a record being updated. You could see that the product with the ID VR8NCNE7DV has the inventory level modified in the second set of records, which is from the output of the map reduce job.
R217970F17,wearables,misfit,103,21,1427761080 6QD0BAVS7I,laptop,asus,502,54,1427764070 VR8NCNE7DV,wearables,fitbit,112,88,1427766458 3SDS30A11P,laptop,thinkpad,551,54,1427770906 ............................................. VOA31MCU9I,cell phone,apple,150,43,1427644188 VR8NCNE7DV,wearables,fitbit,112,82,1428415316 VRN5D60451,tablet,amazon kindle,258,96,1428527865 VSE72T0P4M,tablet,samsung,294,51,1426578803
A Simple Workflow
A simple workflow as described below could be used to merge incremental data with the base data using the map reduce job. It involves doing some HDFS file juggling and running the MR job.
- Base data in HDFS directory …/product/master
- Incremental data files as they arrive are dumped into …/product/incr
- Move current set of incremental data files to …/product/working
- Run MR job taking input from …/product/master and …/product/working
- Output of MR job goes …/product/output
- If temporal ordering is being done, using file name suffix, then an _ and epoch time stamp or sequence should be appended to each output file name
- Move current base data files from …/product/master to …/product/backup
- Move new base data files from …/product/output to …/product/master
Step 3 is necessary because we want to run the MR job on a current snapshot of incremental data files. It prevents the MR job from being affected by newly arriving incremental data files, while the job is running.
Step 7 and 8 should execute quickly since they involve only file move. So that the impact on any MR job or Hive query will be minimum. Ideally, steps 7 and 8 should be performed as an atomic operation.
With Hive, instead of ../product/master, Hive external table data directory or the warehouse data directory should be used.
Hive Partition
If you are using Hive with the data partitioned, you need to partition the incremental data file the same way and create multiple partitioned incremental data files. Then you need to run the MR job multiple times, once for each partition.
Each run will take as input the base data for a partition and the corresponding incremental data for the same partition. Essentially, you have to repeat a work flow very similar to the one described above for each Hive partition.
Since the incremental data files are not likely be too big, the easiest thing might be to write a python script to partition the data.
Scaling with Partitioning
As the merged master data volume grows, scaling may become an issue. Master data should become up to date as quickly as possible after the arrival of the incremental data. Intelligent partitioning of the data could make the make the merge and compact process more scalaeble. Event if you are not using Hive, you could still partition your HDFS master data files.
Partitioning should be done in a way to minimize the number of partitions affected by the merge and compact process. For example, we could partition by create data, based on the intuition that mutations tend to happen more with the recent data. If this intuition turns out to be true, only a small fraction of partitions of the master will be involved in the merge and compact process.
What About Other Hive Storage Formats
The bulk mutation map reduce uses field delimited flat text data as input and produces field delimited flat text data as output. What if the Hive tables are in ORC , Parquet or some other storage format? You could follow these steps to handle other storage formats.
- Export hive table data to a separate table with delimited text storage format using insert overwrite
- Run bulk mutation map reduce as outlined earlier, using the output of step 1. The output of this will delimited text files.
- Treating the output of map reduce step 2 as Hive table with delimited text storage format, run insert overwrite to create Hive tables of desired storage format.
If the bulk mutation map reduce is the only way, data is being merged, then step 1 needs to be performed only once.
For subsequent executions, instead of running step 1 , you could take the output of step 2 from previous run and use it in step 2 of the current run. Essentially, you are maintaining a parallel Hive table with delimited text storage format.
Summing Up
Here is a tutorial document, in case you want to try it out. The solution pattern used here is widely used by many NOSQL databases. Multiple versions of a document or column of a row resulting from mutation are maintained. During query, the database return the latest version of data. A compactor runs periodically to remove the older versions of data.
Pingback: Bulk Insert, Update and Delete in Hadoop Data Lake | Mawazo | Big Data Cloud
I would like to capture the historical data as well. i.e implement a slowly changing dimension(SCD) type 1 as few of the hive tables. How should i proceed in such a case?
Thank you,
Mitra
This is nice explanation ..even I would like to know more about capturing the histprical data using hive and hdfs files
hive doesn’t support DML queries,
if create a ACID property to do this,
set this properties
SET hive.support.concurrency=true;
SET hive.enforce.bucketing=true;
SET hive.exec.dynamic.partition.mode=nonstrict;
SET hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
SET hive.compactor.initiator.on=true;
SET hive.compactor.worker.threads=1;
SET hive.support.concurrency=true;
SET hive.enforce.bucketing=true;
then create a hive table using bucketing and stored as ORC file format,
then do the update and delete queries in hive,
for more information on hive DML go this blog for step by step process
http://www.geoinsyssoft.com/hive-dml-update-delete-operations/
@Anandh
As I explained in this post, there are scenarios, where you can not event define the update HQL query you need.
Consider a batch of of 5000 records, out of which 1000 are existing i.e. they require update. First you have to find the id’s that already exist and then you have to formulate “where in ()” clause for the update query. AFAIK, Hive does not support “in” in where clause. You can execute multiple “where id = …” queries 1000 times. it’s just not practical.
btw, “IN” is supported in Hive since 0.13:
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+SubQueries
Even if Hive supports “in”, it does not help. As I said, if your “in” clause has to include thousands of values it’s not practical to write such queries. Hive may even have a limit of number of values you provide in “in”
@Pranab,
I have unique use case:
We trying to build data Lake using HDFS.
– 200+ baseTables with few billions of records/ few TBs of data.
– Support incremental insert/Update with SLA of less than 30 Minutes.
I feel this is impossible to achieve using HDFS or (hive for that matter).
I tried small POC using few tables, but didn’t get updates available within given SLA.
I also tried POC using key-value store( e.g. Couchabse) as Data Lake and was able to get updates within given SLA.
But we will be more happy if we get this SLA with HDFS as Data Lake.
Do you have any other suggestions/techniques where HDFS can be used to achieve this SLA for the given size of data ?
Did you try the technique as outlined in this post? Only thing I can think of is tuning HDFS for better performance.
You can try increasing cluster size. BTW latency will increase as the existing data size grows, because incremental update involves joining with existing data.
Current implementation does reduce side join and shuffle. If the size of incremental batch is reasonable, you could try map side join
Thanks for your reply!
We tried only using HDFS. No hive at all.
We iterate over existing files to locate update and re-write existing data with updates to new file. Disk I/O is limitation for scaling as we need to scan and re-write the file.
I did try partitioning on HDFS, but that didn’t help much when incremental updates are for all partitions !
Hello Pranab,
Great explanation .I have same use case but only difference is that i have incremental records in order,that is for one key there might be several updates and all updates has to be in sequence because one update will have info about coming updates .
How can i achieve that ?
Every time do i have to comparing each update records with the main files or records ?
Regards
Sudarshan
As new update arrives you run the MR using the current update file and the output from the last run of the MR job. You run the MR job once every time new updates arrive in the same sequence.
Yes Pranab but the problem is i have 5k files and update arrives 10 xml files per minutes .So what do you think about that ?
Sudarshan. You can write a script that will order the files by time stamp and run the MR job taking the incremental file as input in that same order. But your cluster should be capable of running and completing the MR job 10 times per minute.
Another option is to concatenate few incremental file following the time order and running the MR job on the concatenated file. Processing will be faster
A third option will be for me to modify the MR job so that it can handle multiple incremental files. I will look into that and update you
Thanks Pranab i will look into details of what you suggested …Will wait for your reply …
Sudarshan, I checked the implementation. It will work for multiple incremental files, as long the sequence number appended to the names of files are in the right order, if you are choosing the fine name based option. For a given record, the record from file with the highest sequence number as suffix (i.e latest update) will prevail.
Why not admit for EDW SCD Type 1 , Type 2 etc. will not work in hive . for large org there will be so many fact and dimension tables which will have data in excess of 100 GBs and and some fact tables with TB of data . none of the solutions will work. Fact tables can be also type 2 .
Is it a good solution to use a Document DB (CosmosDB, MongoDB or another) to capture the updates and then use them in Hive together with transnational data?
You can always do update in databases. Point was how to do updates in an append only file system like HDFS