I was prompted to write this post in response to a recent discussion thread in linkedin Hadoop Users Group regarding fuzzy string matching for duplicate record identification with Hadoop. As part of my open source Hadoop based recommendation engine project sifarish, I have a MapReduce class for fuzzy matching between entities with multiple attributes. It’s used for content based recommendation. My initial reaction was that the same MapReduce class could be used for duplicate detection, by modelling all the fields of a record as text field and calculating Jaccard similarity between corresponding text fields.
On further thought, I realized that the Jaccard similarity may not be right choice and Levenshtein Distance is more appropriate. Accordingly, I added support for Levenshtein Distance based text matching to sifarish. In this post, I will show how it’s been used to detect duplicate records.
Fuzzy Entity Matching
The details of the matching algorithms can be found from my earlier posts. For an entity, various attribute types are supported including integer, double, categorical, text, time, location etc. Essentially for any pair of entities, distance is calculated between corresponding attributes. Attribute wise distances are aggregated over all the attributes of an entity to find the distance between two entities.
Why is Jaccard distance not appropriate for this use case? Given two blocks of text, Jaccard similarity depends on the number of words common to both and the union of words in the the two text blocks. Two words will not match whether there is difference in one character or multiple characters. Levenshtein Distance gives us more fine grained text matching algorithm.
Levneshtein Distance
Given two words, it’s defined as the number inserts and deletes of characters necessary to make one word same as the other. The Levenshtein Distance between two words w1 and w2 is calculated as follows
distance(w1,w2) = length(w1) + length(w2) – 2 * maxCommonSubSeqLength(w1,w2)
where
maxCommonSubSeqLength is the length of longest common sub sequence between w1 and w2.
The distance is normalized, by dividing the distance with sum of the length of the two words as below, forcing the normalized distance to be always be between 0 and 1.0.
normDistance(w1,w2) = distance(w1,w2) / (length(w1) + length(w2))
For a pair of text blocks, normalized distance is calculated between corresponding word pairs. The final distance between two text fields is the distance averaged over all the words. The implementation is the class EditDistanceSimilarity. Various attribute distance algorithms are supported e.g., euclidian, manhattan etc.
This is not the only way to calculate Levinshtein Distance. It is also possible to take the whole text field as one token and calculate distance by setting the parameter edit.dist.token to false. I have one word of caution. The edit distance calculation computation time grows non linearly with the token length. Treating the whole text field as one token may be very computationally intensive.
Similarity Map Reduce
The class SameTypeSimilarity contains the implementation. The details of it’s inner workings can be found in my earlier posts. Essentially, it does a hash based self join and works as follows
- We configure a set of buckets
- Each record is hashed into one of the buckets
- A bucket pair and the associated records is processed in one reducer call.
- The reducer pairs records from each bucket in a nested loop and calculates distances
It turns out, that the proper configuration of the number of buckets is critical for this example. The number of buckets should be large, so that each reducer call does not have to process too many records. The number of buckets is set using the parameter bucket.count. While running the map reduce, I had the Hadoop job tracker terminating a reducer task because of the heart beat time out, until I increased the bucket count.
As thumb rule, I would suggest choosing the number of buckets such that each bucket has about 4 or 5 records. Otherwise you may have the unfortunate experience of reducer taking too much processing time in one call and eventually getting timed out terminated.
In this Map Reduce, every record is compared with every other record. So the complexity is O(n x n). However, for our particular example, if we had two data sets that are supposed to be identical, a simpler approach could be taken. We could simply compare corresponding records from each set. The complexity in that case would have been O(n).
Attribute Distance Threshold
One way to optimize the problem and to convert the complexity to a almost O(n) is to abandon distance processing between two records, as soon as the distance between an attribute pair is found to be above a predefined threshold.
For example, if the the distance between the name fields for two customer records is significant enough, we can skip processing the remaining attributes and set the distance between the two records to a large value.
The threshold value is defined in the meta data JSON file as below. The threshold can be set for any number of attributes. This is how it’s set for the name field of a customer record.
{ "name" : "name", "ordinal" : 1, "dataType" : "text", "distThreshold" : 0.4 }
Two Near Identical Data Sets
If you have two data sets where the records are supposed to identical, except for small differences, the processing can be performed in O(n) time. This is optimization is done by enabling inter set matching as below.
inter.set.matching=true set.ID.size=1
The first parameter enables inter set matching. For this to work, the entity ID of each record needs to be encoded in a special way. The first few characters of the entity ID is the set ID and the remaining characters comprise the real entity ID within a set. The length of the set ID is defined by the second parameter above.
Essentially, matching is performed between records for the two sets only if entity ID after peeling off the set ID matches. So, we are finding distances only between corresponding entities from the two data sets.
Duplicate Customer Records
We will be using customers records as example. The record has the the following fields. The first field which is the ID, does not enter into distance calculation in any way.
- ID
- Name
- Address
- City and state
- Phone number
The data consists of two sets of identical customer records. For some records, I introduced small typographical errors. Here is some sample input
106379,Richard Gomez,2934 Encino Pt,San Antonio TX 78259,210 4811932 158280,Jim Dobbs,2756 Bulls Bay Hwy,Jacksonville FL 32220,312 2850284 137943,Robert Lewis,194 Buckboard Dr,Augusta GA 30907,406 5404029 125849,Jena Marin,276 Durham St,Menlo Park CA 94025,650 2957406 156290,Dharam Patel,84 Prospect Hill Dr,Tewksbury MA 01876,718 4702915 ......... 206379,Richard Gomez,2935 Encino Pt,San Antonio TX 78259,210 4811932 258280,Jim Dobbs,2756 Bulls Bay Hwy,Jacksonville FL 32220,312 2850284 237943,Robert Lewis,194 Buckboard Dr,Augusta GA 30908,406 5404029 225849,Jena Marin,276 Durham St,Menlo Park CA 94025,650 2957406
The output MR class SameTypeSimilarity consists of ID pair followed by the distance. There there types of output as follows
- Distance is 0 for identical records
- Distance is close to 0 for records that are identical except for small typographical errors
- Different records with high distance value
The second type is of main interest to us. Because of the fuzzy matching logic, we are able identify records as duplicate records in spite of small typographical errors. Here some sample output. The last column is the distance scaled by 1000.
106379,278253,757 106379,206379,16 178253,278253,0 178253,206379,757 278253,206379,757 168295,185062,612
We see all 3 types of output as described earlier. Let’s pay attention to the records with ID 106379 and 206379 with the distance between them as 16. It’s the case of near duplicate records with small typographical errors. Here are the two corresponding records. There is one wrong character in the address field.
106379,Richard Gomez,2934 Encino Pt,San Antonio TX 78259,210 4811932 206379,Richard Gomez,2935 Encino Pt,San Antonio TX 78259,210 4811932
That one misplaced character has caused the distance between the records to be 16 instead of 0. Otherwise the distance would have been 0.
Attribute Distance Aggregation
The attribute distances are aggregated for all attributes. There are different aggregation algorithms. For this example, I have used mahattan distance a.k.a L1 distance.
All attributes are treated equally. However it is possible to assign different attribute weights through the meta data. For example, if I had specified a weight greater than (e.g. 1.2) for the phone number field, it would have shrunk the distance. In other works, the distance between two phone numbers are made to appear less than the actual distance.
Wrapping Up
In this post I have shown how to use a Hadoop based solution for finding near duplicate records. Here is a tutorial document with details on how to run the map reduce job.
For commercial support for this solution or other solutions in my github repositories, please talk to ThirdEye Data Science Services. Support is available for Hadoop or Spark deployment on cloud including installation, configuration and testing,
Hi..Pranab,
A great post. I really found a great idea to identify and find out a duplicate records.
Mathew
I am glad you liked it
Thank you Pranab!
Hi Pranab,
Lets say I have hudge no of candidate records(csv data), now I want to find duplicate records depending on email-id and contact-no of cadidate using mapreduce.
I am using writablecomparator for my candidate object, the problem with this is I am not able to figure out where to compare two objects .
Can you please help me out ???
Please follow the steps in the tutorial to find near duplicate records using sifarish. There are complex algorithms involved in finding distance between two objects with different types of attributes. The link to the tutorial is at the end of the blog. Here it is any way
https://github.com/pranab/sifarish/blob/master/resource/duplicate_detection_tutorial.txt
Pingback: Location and Time Based Service | Mawazo
Hi Pranab,
Your blog is very useful. I am working on an algorithm LetterPairSimilarity to find identical strings. I need your help like how to do same algorithm in mapreduce hadoop as i am new to hadoop. the algorithm can be find on this link http://www.catalysoft.com/articles/StrikeAMatch.html..I will be thankful to you.
Thanks,
Priya
Priya
Assuming each document will have a document ID and the text, the mapper output will be pair of hash values based on document ID, as is done in the SameTypeSimilarity map reduce. In the reducer, you could apply the letter pair algorithm. Various text matching algorithms are supported in sifarish. I could easily add this one too.
Pranab
Priya
I have implemented the character pair text similarity algorithm. You have to specify textMatchingAlgorithm attribute as charPair in the JSON metadata file to use it.
Pranab
Hi Pranab,
I’m trying to find a Hadoop solution to match one address dataset to another ;and this post is very helpful. I understand that levenshtein distance seems my best choice of distance function. But since I’m a newbie to Hadoop world, I’m a bit confused about the MapReduce process you used here, i.e. how the master node split the data, what tasks should slave nodes do, and finally how the output of slave nodes is reduced. It’d be very much appreciated if you could explain more or point me to some other posts of yours!
Thanks a lot and Happy New Year!
Molly
I am glad you liked the post. As far as how Hadoop works I would suggest the Yahoo tutorial. For our problem, with n records, there are n*n comparisons. That work is split up and executed in parallel. You will get as many output files as the number of reducers you choose. Your final result is concatenation of all those output files.
Pranab,
Thank you for the quick responding!
Let me briefly explain how I understand this and what’s the problem: if we try to match dataset A (contains n records) to dataset B (contains m records), we need split dataset A to N buckets, each including n/N records; and by running this job on N nodes, which means comparing subset of A with entire dataset B, we can speedup by N times.
I think the problem of this method is that we have to store the entire dataset B (which could be large) on each node to do the comparison and it seems we are not using the full potential of MapReduce (maybe I’m wrong…). So I wonder if any optimization is possible.
Please let me know did I get this correctly and what’s your comment on my questions.
Thanks,
Molly
There will me n x m record comparisons. Sifarish uses hashed cartesian join. Your understanding of how Sifarish works is not correct. The data sets are splits into hash buckets. Buckets from the 2 different data sets are paired up. Each bucket pair is processed by some reducer.
In Hadoop, you just put the data into HDFS. You don’t need to worry about how the data is accessed by the mappers and reducers. It’s all Hadoop internal and Hadoop takes care of that. Again, I would recommend reading the hadoop tutorial first. Here is the link
https://developer.yahoo.com/hadoop/tutorial/
Hi Pranab,
Great post & great product indeed. Ever thought about porting it to Spark ?
Cheers.
Dripple
Sure, I would love to port to Spark, not just this, but many of other projects also. But there are only so many hours in a day
Pranab,
Would love to code at least the deduplication part in spark. Will take a look further.
Hi Pranab,
This is very helpful… did you apply any kind of blocking techniques here? if not computation will take lot of time where matching millions of records?
I am not sure what you mean by blocking techniques. It uses hash partitioning. Each call to the reducer gets a pair of hash partitioned data sets. It iterates through the the two data sets in a nested loop to compute duplicate score between record pairs. It’s O(n x n) brute fore algorithm and relies on Hadoop’s scalability.
Some optimization may be possible with combiner if there was an efficient way to quickly find out if two records are very unlikely to be duplicates.
I was saying if there are million cross million records then to reduce the match space we can use some kind of blocking techniques like phonex, last 5 digit of phone number,zip code and do so we will reduce our search space. and now on this data we can run similarity algorithms.
@gangele
Makes sense to apply that kind of domain specific logic to prune the search space. In this specific case, I think what you are saying is that if the phone number and name match, then the records are essentially duplicates.
I will keep this mind. I am going to port this to a new Spark based ETL project I have started on github. I will make this kind of optimizations in the Spark implementation.
@Dripple
As I mentioned in my earlier comment, I will be porting this to a new Spark based ETL project in github, that I have started. I also have various Hadoop MR jobs in different projects that have ETL functionality. I will be porting them to the new Spark project.
Hi Pranab,
Thanks for the tutorial. I ran the above programme with the small set of data and it works fine. Currently I am running with 1562 records and it takes almost 1 hour and get time out. So further reading the documentation we have almost 7 nodes and I set the number of nodes to the property file as num.reducer=7 and increase the bucket.count=2000 but still its running more than half and hour and I didnt see the output its still running. Any thought and update on this
Thanks for the help
Vinoj
it got time out again and here is the message that i am getting and took almost 45 mnts.
Thanks for the help
15/03/29 09:15:44 INFO ipc.Client: Retrying connect to server: votlbdcd03.tms.toyota.com/10.63.37.105:41590. Already tried 1 time(s); maxRetries=3
15/03/29 09:16:04 INFO ipc.Client: Retrying connect to server: votlbdcd03.tms.toyota.com/10.63.37.105:41590. Already tried 2 time(s); maxRetries=3
15/03/29 09:16:24 INFO mapred.ClientServiceDelegate: Application state is completed. FinalApplicationStatus=FAILED. Redirecting to job history server
15/03/29 09:16:24 INFO mapreduce.Job: Job job_1427166997404_0446 running in uber mode : false
15/03/29 09:16:24 INFO mapreduce.Job: map 100% reduce 0%
15/03/29 09:16:24 INFO mapreduce.Job: Task Id : attempt_1427166997404_0446_r_000002_0, Status : FAILED
AttemptID:attempt_1427166997404_0446_r_000002_0 Timed out after 600 secs
15/03/29 09:16:24 INFO mapreduce.Job: Task Id : attempt_1427166997404_0446_r_000005_0, Status : FAILED
Vinoj
I need more details on your use case. Edit distance is for structured text fields like address, phone number etc. If not used properly, the reducers will take too long and tasks will start timing out. I need your configuration property file, JSON meta data file and a sample of about 20 lines of your input. You could either file an issue in github with all these info or send me an email, My email id is in the github project home page.
BTW, num.reducer is a global parameter. If you are running multiple MR jobs off the same property file, you should use the number of reducer property specific to this MR which is sts.num.reducer
Hi Mr.Ghosh,
Ive sent you a mail regarding the sametypesimilarity class . Kindly look into it.
Jesho,
I am responding here so that others can benefit. As in the configuration setting in the tutorial edit.dist.token should be set to true, so that if your text field contains multiple words, it will apply edit distance to corresponding pairs of words and then take average. Edit distance is appropriate when a text field has fixed number of words or tokens, across records and you goal is to find near duplicates
Any field not included in the JSON schema is not included in the distance computation.
Number of buckets should be (number of records / bucket size) where bucket size is 50 – 100 range. Passive fields are fields which don’t enter distance calculation, but you want them to be included in the output of the map reduce.
Got it. Ive put my bucket size as 2 and the input has hundred records. Total no. of fields in the json are 3 ( one id field and two fields for distance computation). The num.reducer I’ve set it to 2. But still the map reduce takes more than 15 mins. Any ideas to fasten up the process?
Jesho,
How big are your text fields. How many words do they have? It may be related to your Hadoop installation / configuration. Please email me your JSON schema file, properties config file and few sample lines of your input.
Hi pranab,
I’ve emailed you my configuration files and sample input. Kindly have a look into it
Because there are multiple text fields, the reducer is doing too many edit distance distance calculations between pair of records and becoming highly CPU bound, causing timeout This has been reported by others also.
I will modify SameTypeSimilarity, so that as on option you can partition the data vertically also. The load in reducer call will go down because each call the reducer do distance calculations for a specific field for a set of records. An additional map reduce will be necessary to take the output and generate record pair similarity by aggregating field pair similarities.
Pingback: Profiling Big Data | Mawazo
Hi Pranab,
i want to implement Damerau Levenshtein distance in hadoop can you help me how will write code in mapredduce ..
It’s in my project sifarish. Take a look
Hello Pranab,
I am working on identity matching across multiple social media sites and for this i am using publicly available attributes of the users like profile, network and activity contents. can you suggest me other method to improve the technique. Thank you..
You need to look into entity resolution algorithms
Pingback: Removing Duplicates from Order Data Using Spark | Mawazo
Pingback: Bad Performance for Dedupe of 2 million records using mapreduce on Appengine – tuatphukien.com