At LiveRamp, many of our hadoop workflows join two datasets together (more datasets are supported but for the sake of simplicity the blog will cover the case of two datasets). In order to join two datasets efficiently, both have to be sorted, which happens in the Shuffle phase of MapReduce jobs.
In general we try to avoid Shuffles and subsequent Reduces because they require copying lots of data over the network. To get around this, we can make sure to always keep the data in sorted order when it’s on HDFS. Then when reading it out no expensive sorting step is necessary and we can do the join on the Map side. This is called a Map-Side Join / Sort-Merge Join or MSJ (not to be confused with a HashJoin, which keeps the entire smaller side in memory), and it is used extensively throughout our hadoop workflows. We will explain the inner workings of Map-Side Joins in a future blog post.
Even though MSJs are efficient, they are not as efficient as they could be in certain cases. Imagine we have two sets of data in sorted order on HDFS, one that is a few Gigabytes in size, and one that is is a few Terabytes in size. When we join these two together, both stores need to be iterated over fully, even though we need only a fraction of records in the larger store for the actual join. For every record in the smaller side, we need to iterate over records in the large side until we find a record with the same key as the current record in the small side.
To prevent reading the entire large store every time we do an MSJ against it we came up with a solution that we call Seek-MSJ. Seek-MSJ was inspired by other uses of indexing like SQL or HBase. Seek-MSJ creates an index of the large store so it’s possible to skip over parts that we don’t need. While it would be possible to store the larger dataset in a key-value store like HBase, this approach fits our use case better for two reasons:
- Sorted data. We know that we will only be performing forward seeks, because the keys queried for will be in sorted order. This drastically improves the efficiency of the seeks, especially on large datasets. Many of our datasets have billions of records on the large side, and we are querying them with a couple of million keys in a few minutes.
- Ease of use. With Seek-MSJ we can use our existing datastores and only need to store some additional information alongside. In order to use a random-lookup key-value store, the data will need to be explicitly transformed into a format that the key-value store supports.
When writing a large dataset that we want indexed, every mapper samples a record every X records. We then retrieve the key from that record and write it to a separate index file unique to that mapper. If we sample once every 2 records, the writing process looks like this:
When reading, each mapper has their own .index file. When the key-value pairs from the smaller side come through, we take the key for each pair and compare it to the index to see where the record is located in the larger side. We then skip over all intermediate records to the byte position where the closest lesser key is located. If the index contains only a smaller key and not the exact key, we iterate through the next records from either the current position or the position found in the index (whichever is larger) until we’ve found the record we’re looking for or until we find a key that’s larger, which means the key does not exist in the data.
Pseudo code for joining process (not taking into account multiple values per side or error handling):
indexEntry = index.floorEntry(smallStream.currentKey) if indexEntry.key > largeStream.currentKey largeStream.seekTo(indexEntry.position) while largeStream.currentKey < smallStream.currentKey largeStream.advanceOneRecord() if largeStream.currentKey == smallStream.currentKey return smallStream.currentKey, smallStream.currentRecord, largeStream.currentRecord else return smallStream.currentKey, smallStream.currentRecord
The joining process looks like this:
With large size differences between the small and large stores the difference in performance between Seek-MSJ and a normal MSJ can be quite significant, up to 10x faster.