Hadoop Data Lake, unlike traditional data warehouse, does not enforce schema on write and serves as a repository of data with different formats from various sources. If the data collected in a data lake is immutable, they simply accumulate in an append only fashion and are easy to handle. Such data tend to be fact data e.g., user behavior tracking data or sensor data.
However, dimension data or master data e.g., customer data, product data will typically be mutable. Generally they arrive in batches from some external source system reflecting incremental inserts, updates and deletes in the external system. All incremental changes need to be consolidated with the existing master data. This is a problem faced in many Hadoop or Hive based Big Data analytic project. Even if you are using the latest version of Hive, there is no bulk update or delete support. This post is about a Map Reduce job that will perform bulk insert, update and delete with data in HDFS. The implementation is part of the open source project chombo.
Hive HQL Does Not Help
Even if you are using the latest version of Hive, you are not out of the woods. Latest version of Hive HQL has support for update and deletes. But that does not help. What we have from the external system are data feeds containing the records that got inserted, updated and deleted.
It’s not feasible to try to translate them to a set of corresponding HQL statements. Even if you could do that, Hive will be extremely inefficient because it will run a map reduce job for each such HQL statement. I think we are getting the picture. It looks like a dead end.
As we will see later, you could use the Map Reduce job described in this post, whether you are using Hive or doing some custom data science work directly in Hadoop.
As I mentioned earlier, immutable data i.e. insert only data does not pose any special problem. All you have to do is to simply drop such files in the Hadoop or Hive directory containing existing files. No additional work is necessary.
Hive Input Output Format
In one of earlier posts, I had outlined a solution specifically for Hive with the aid of a custom Input Output Format. The key idea is to maintain multiple versions of a record, making use of Hive dynamic partitions.
A compactor run periodically would get rid of older versions of data i.e. older dynamic partitions. The latest version of data will be consolidated to a new dynamic partition.
Bulk Mutation Map Reduce
The implementation is in the class RecordSetBulkMutator. Let’ go through some key ideas on how it works. In the current master data in Hadoop we have the latest version of of all records for some entity e.g. product. In the incoming incremental feed we may have one or more mutations for an existing record and new records. Every record has an unique ID.
Essentially, we join the the data sets i.e master and incremental data with the ID and do a secondary sorting on the temporal ordering field.
On the reducer side, for a given record key, we get all the mutation in the value list. The last record in the list is the latest mutation i.e., latest version of a record, which is emitted if it’s an update and not emitted if it’s a delete. For insert, there will be only one record, which will be emitted. Essentially it’s a merge and compact process.
For example, if the feed contained 2 updates for a record, the the values list will contain 3 records. The first one being record from the existing master data and the other two being the two updates from the incremental data feed. We simply emit the last record, because that is the latest version.
In the example above, if the last record corresponded to a delete, then we would not have emitted anything from the reducer.
We assumed that each record contains field that represents the temporal ordering of the mutations. It could be a time stamp or a sequence. We are assuming that the source system has such a field and it’s being properly populated, when data is being updated or deleted.
What if the source system did not have such a field. There are few options, if that is the case. On the receipt of a data file on Hadoop Data Lake side, we could insert a temporal ordering field. One easy solution is to create epoch time stamp, assign it to the first record and assign incremented values to the rest of the records. This will work provided, the mutations were done in the source system in same order as they appear in the icremental data file.
Another option is to rename the file such that it ends with _ followed by a time stamp or a sequence. With this approach, we are assigning the same time stamp to all the records in the file. One down side of this approach is that if there are multiple mutations on the same record, one of them will picked randomly on the reducer side, which may not be the latest.
We need a way to identify deletes. As we saw, on the reducer side, for a given record ID if the last record has the mutation type delete, then we don’t emit the record. There are two options for identifying deletes. We could have a column in the data containing an op code. In the reducer, the op code could be used to identify whether the last record for given key corresponds to a delete.
An easier option is to prefix the data file containing all deletes with a special string, that can be configured through a configuration parameter. This will work as long as all the deleted records are in a separate file.
I have run the bulk mutation map reduce for some product data containing the following fields
- Product ID
- Time stamp
I have chosen the option where the data has a temporal ordering field, which is the last field. I have one incremental data file containing a mixture of inserts and updates and another containing deletes only.
For handling deletes, I have chosen the option of prefixing the name of the files containing deletes only with a string that’s defined through a configuration parameter. Here are all the configuration parameters. The record ID can be a composite of multiple columns. In that case, you need to provide a coma separated list of column ordinals for id.field.ordinals.
#op.code.field.ordinal=4 deleted.op.code=D deleted.record.file.prefix=del temporal.ordering.field.ordinal=5 temporal.ordering.field.numeric=true id.field.ordinals=0
Here are some snippets of the master data before the job ran and after the job ran showing a record being updated. You could see that the product with the ID VR8NCNE7DV has the inventory level modified in the second set of records, which is from the output of the map reduce job.
R217970F17,wearables,misfit,103,21,1427761080 6QD0BAVS7I,laptop,asus,502,54,1427764070 VR8NCNE7DV,wearables,fitbit,112,88,1427766458 3SDS30A11P,laptop,thinkpad,551,54,1427770906 ............................................. VOA31MCU9I,cell phone,apple,150,43,1427644188 VR8NCNE7DV,wearables,fitbit,112,82,1428415316 VRN5D60451,tablet,amazon kindle,258,96,1428527865 VSE72T0P4M,tablet,samsung,294,51,1426578803
A Simple Workflow
A simple workflow as described below could be used to merge incremental data with the base data using the map reduce job. It involves doing some HDFS file juggling and running the MR job.
- Base data in HDFS directory …/product/master
- Incremental data files as they arrive are dumped into …/product/incr
- Move current set of incremental data files to …/product/working
- Run MR job taking input from …/product/master and …/product/working
- Output of MR job goes …/product/output
- If temporal ordering is being done, using file name suffix, then an _ and epoch time stamp or sequence should be appended to each output file name
- Move current base data files from …/product/master to …/product/backup
- Move new base data files from …/product/output to …/product/master
Step 3 is necessary because we want to run the MR job on a current snapshot of incremental data files. It prevents the MR job from being affected by newly arriving incremental data files, while the job is running.
Step 7 and 8 should execute quickly since they involve only file move. So that the impact on any MR job or Hive query will be minimum. Ideally, steps 7 and 8 should be performed as an atomic operation.
With Hive, instead of ../product/master, Hive external table data directory or the warehouse data directory should be used.
If you are using Hive with the data partitioned, you need to partition the incremental data file the same way and create multiple partitioned incremental data files. Then you need to run the MR job multiple times, once for each partition.
Each run will take as input the base data for a partition and the corresponding incremental data for the same partition. Essentially, you have to repeat a work flow very similar to the one described above for each Hive partition.
Since the incremental data files are not likely be too big, the easiest thing might be to write a python script to partition the data.
Scaling with Partitioning
As the merged master data volume grows, scaling may become an issue. Master data should become up to date as quickly as possible after the arrival of the incremental data. Intelligent partitioning of the data could make the make the merge and compact process more scalaeble. Event if you are not using Hive, you could still partition your HDFS master data files.
Partitioning should be done in a way to minimize the number of partitions affected by the merge and compact process. For example, we could partition by create data, based on the intuition that mutations tend to happen more with the recent data. If this intuition turns out to be true, only a small fraction of partitions of the master will be involved in the merge and compact process.
What About Other Hive Storage Formats
The bulk mutation map reduce uses field delimited flat text data as input and produces field delimited flat text data as output. What if the Hive tables are in ORC , Parquet or some other storage format? You could follow these steps to handle other storage formats.
- Export hive table data to a separate table with delimited text storage format using insert overwrite
- Run bulk mutation map reduce as outlined earlier, using the output of step 1. The output of this will delimited text files.
- Treating the output of map reduce step 2 as Hive table with delimited text storage format, run insert overwrite to create Hive tables of desired storage format.
If the bulk mutation map reduce is the only way, data is being merged, then step 1 needs to be performed only once.
For subsequent executions, instead of running step 1 , you could take the output of step 2 from previous run and use it in step 2 of the current run. Essentially, you are maintaining a parallel Hive table with delimited text storage format.
Here is a tutorial document, in case you want to try it out. The solution pattern used here is widely used by many NOSQL databases. Multiple versions of a document or column of a row resulting from mutation are maintained. During query, the database return the latest version of data. A compactor runs periodically to remove the older versions of data.