Transitive Elimination

Engineering

Learn from our challenges and triumphs as our talented engineering team offers insights for discussion and sharing.

Transitive Elimination

Engineering

At LiveRamp, we maintain a large identity graph that groups identifiers such as hashed emails into connected components where each component represents all of the identifiers that correspond to a single person and the ID for that person.

The identity graph links different identifiers such as hashed email addresses and zip codes to unique IDs.

 

This graph changes over time as we add and remove edges connecting identifiers. For data onboarding, we want each person to be represented by the same ID even after other identifiers are added to the person’s representation. In order to maintain consistency, the ID for the person is set to the person’s oldest identifier’s ID.

We build this identity graph from a list of billions of edges that pair identifiers. These edges contain many transitive links. For example, the two edges 1 → 2 and 2 → 3 transitively connect 1 → 3. We use an algorithm we call “transitive elimination” to transform the list of input edges into a mapping from each identifier’s ID to the lowest value ID the identifier is connected to.

For example, if our input edges are:
1 → 2, 2 → 3, 2 → 4, 4 → 0, 10 → 9, 9 → 8, 14 → 15

Input graph for transitive elimination.

Our desired output would be:
1 → 0, 2 → 0, 3 → 0, 4 → 0, 10 → 8, 9 → 8, 15 → 14

Desired output of transitive elimination.

This process would be very easy if all of the edges fit in memory. However, we have several terabytes of equivalences. Therefore we use Hadoop and Cascading to resolve these transitive links in a distributed manner.

Conceptually, transitive elimination processes the edges on a source node by source node basis, adding or removing different types of edges to the graph iteratively until a connected component only has edges pointing to the lowest node. The algorithm only operates on edges that are not guaranteed to point to the lowest node in the component.

First of all, let us consider the algorithm itself:

  1. Loop over all remaining edges
    1. On each reducer, consider one group of edges with the same source node
      1. For each edge in each group:
        1. If the group contains only one edge and the source node is higher than the target node, remove the edge.
        2. Else if the source node for the group is a local minimum:
          1. If the group with this source node had an edge added to it last iteration:
            1. Keep the edges
          2. Else:
            1. Write the reverse edge to the “finished” and remove them from the main iterations.
        3. Else this source node must have more than one neighbor, and one of those neighbors must have a lower value than the source node.
          1. Create a bidirectional edge between the lowest neighbor and all of the other neighbors.
          2. Keep a bidirectional edge between the lowest neighbor and the source node.
          3. Drop all edges from the other neighbors to the source node.

This algorithm has some properties that are desirable for distributed processing. First of all, we are able to perform the computation on groups of nodes that are able to fit in memory. Additionally, once a component is in its final state (each node pointing to the lowest node in its component) the edges for the component are removed from the processing. Therefore, we only read and process edges that are relevant to the current iteration. The number of iterations scales on the order of O(logn), where n is the length of the longest path in a component.

Let’s walk through how we would perform this algorithm using Cascading. Please note that this is not a complete solution; there are a few edge cases and implementation details which are omitted for clarity. Also, not all of the code is provided.

First of all we start out with a list of input edges. We store these in a SequenceFile on HDFS with a “source” and a “target” field. Let us consider one of the components from the original example:
1 → 2 , 2 → 3, 2 → 4, 4 → 0

Then we will make all of the edges bidirectional:

Pipe bidirectionalEdges = new Pipe(“bidirectional_edges”);
bidirectionalEdges = new Each(bidirectionalEdges, new MakeBidirectional(), Fields.RESULTS);

private static class MakeBidirectional extends BaseOperation implements Function {
  public MakeBidirectional() {
    super(new Fields(“source”, “target”));
  }

  @Override
  public void operate(FlowProcess flow, FunctionCall call) {
    TupleEntry args = call.getArguments();
    Comparable source = (Comparable)args.getObject(“source”);
    Comparable target = (Comparable)args.getObject(“target”);
    call.getOutputCollector().add(new Tuple(source, target));
    call.getOutputCollector().add(new Tuple(target, source));
  }
}

For our example, the bidirectional store would contain:
0 → 4, 1 → 2, 2 → 1, 2 → 3, 2 → 4, 3 → 2, 4 → 0, 4 → 2

Here is the main loop of the transitive elimination algorithm:

// check to see if there are any more edges to process
while(hasEdgesToProcess(iteration)) {
    // create a pipe to handle the edges
    Pipe edges = new Pipe("edges");

    // sort the edges and create groups with the same source field
    edges = new GroupBy(edges, new Fields("source"));

    // in reducers, process each group
    edges = new Every(edges, new Fields(“source”, “target”, “type”),
       new ProcessGroup(),
       Fields.RESULTS);

    // complete the Cascading flow for each iteration
    iteration++;
}

We use the entire bidirectional file for the first iteration. We manually add a “type” field with the value set to “normal.” For each of our iterations, we use TemplateTaps to separate the output into SequenceFiles based on the following format:
/trans_elim/<iteration>/<status>

The status can be normal, stable, delete_request, or finished. The normal status is given to edges that have a source with at least one new outgoing edge in the last iteration. Stable edges have source nodes that did not change. Delete request edges are edges that we want to remove in the next step of the algorithm.

The ProcessGroup class simply processes every edge with the same source node:

protected static class ProcessEachGroup extends BaseOperation implements Buffer {
 public ProcessEachGroup() {
   super(TYPED_EDGE_FIELDS.size(), TYPED_EDGE_FIELDS);
 }

 @Override
 public void operate(FlowProcess flow, BufferCall call) {
    final TupleEntryCollector output = call.getOutputCollector();
    Iterator iterator = call.getArgumentsIterator()
    // load all of the edges into memory
    // if statements handling 1.1.1 in the algorithm description
 }
}

Let’s see how this algorithm processes our example edges:
0 → 4, 1 → 2, 2 → 1, 2 → 3, 2 → 4, 3 → 2, 4 → 0, 4 → 2

Iteration 1
We process five separate groups of edges.

0 → 4
Zero is a local minimum. Zero did not gain an edge in the non-existent previous run, so we output it to the stable file for iteration 1 on HDFS.

1 → 2
This edge is also sent to the stable file by the same line of reasoning.

2 → 1, 2 → 3, 2 → 4
Node 2 has a neighbor with a lower value and a more than one neighbor. We create bidirectional edges between the minimum neighbor and the other neighbors and write them to the normal file for iteration 1 on HDFS (1 → 3, 3 → 1, 1 → 4, and 4 → 1). We also write the link to the lowest neighbor to the normal file (2 → 1). We add delete requests for the incoming connections (3 → 2 and 4 → 2)

3 → 2
This group has only one edge and the source is higher than the target, so we write it to iteration 1’s delete_request file on HDFS. Note: this is a duplicate edge. However, ProcessGroup deduplicates the edges it processes.

4 → 0, 4 → 2
Node 4 also has a neighbor with a lower value and more than one neighbor. We create bidirectional edges for the lowest neighbor (0 → 2 and 2 → 0) and write them to the normal file. We maintain 4→ 0 in the normal file. Also, we add a delete_request for 4→2.

Now we have finished our first iteration! Our files on HDFS contain the following:
/trans_elim/1/normal
2 → 1
1 → 3
3 → 1
1 → 4
4 → 1
4 → 0
0 → 2
2 → 0

/trans_elim/1/stable
0 → 4
1 → 2

/trans_elim/1/delete_request
3 → 2
4 → 2
2 → 4

Iteration 2
For the second iteration, we process each group built out of the normal, stable, and delete_request edges from the first iteration.

0 → 2, 0 → 4
Zero is a local minimum. One of its edges came from the normal set and one came from the stable set, so we write 0 → 2 and 0 → 4 to the stable file for iteration 2.

1 → 2, 1 → 3, 1 → 4
One is also a local minimum. Similarly, we write all three edges to the stable file for iteration 2.

2 → 0, 2 → 1, 2 → 4
Two has multiple neighbors and a neighbor lower than its own value. We create edges for the minimum neighbor and emit them to the normal file (0 → 1, 1 → 0). We did not create edges between the minimum node and the node that was only in delete_request. We keep the link to the lowest value (2 → 0) in the normal file and request the incoming edge (1 → 2) to be deleted.

3 → 1, 3 → 2
Similarly, we advertise the minimum neighbor (1 → 2 and 2 → 1) and keep the connection to the lowest (3 → 1) in the normal file. 3 → 2 was already in delete_request so it is dropped here.

4 → 2, 4 → 0, 4 → 1
Again, we advertise the minimum neighbor (0 → 1, 1 → 0, 0 → 2, 2 → 0) in the normal file, keep the lowest connection (4 → 0). 2 → 4 is automatically dropped.

At the end of the second iteration we have:
/trans_elim/2/normal
2 → 0
0 → 1
1 → 0
4 → 0
0 → 1
1 → 0

/trans_elim/2/stable
0 → 2
0 → 4
1 → 2
1 → 3
1 → 4

/trans_elim/2/delete_request
1 → 2
1 → 4

Iteration 3
Now we process each group for the third iteration.

0 → 1, 0 → 2, 0 → 4
This is a local min with new edges, so keep the edges in the stable file for this iteration.

1 → 0, 1 → 2, 1 → 3, 1 → 4
1 → 3 and 1 → 0 are the only edges that are not delete requests. Add connections between the minimum neighbor to normal file and keep the connection to the minimum neighbor in the normal file (1 → 0, 0 → 3, 3 → 0). Also, create a delete request for 3 → 1.

2 → 0
Drop 2 → 0 since the source only has one neighbor which is lower than its own value.

4 → 0
Similarly, drop 4 → 0.

At the end of the third iteration we have:
/trans_elim/3/normal
1 → 0
0 → 3
3 → 0

/trans_elim/3/stable
0 → 1
0 → 2
0 → 4

/trans_elim/3/delete_request
3 → 1

Iteration 4
0 → 1, 0 → 2, 0 → 3, 0 → 4
This is a local min with new edges, so write the edges to the stable file for this iteration.

1 → 0
Drop 1 → 0 since the source only has one neighbor which is lower than its own value.

3 → 0
Similarly, drop 3 → 0.

At the end of the fourth iteration we have:
/trans_elim/4/stable
0 → 1
0 → 2
0 → 3
0 → 4

Iteration 5
All of the edges in Iteration 4 are in the stable set. Therefore, we output each of the reversed edges to the finished file on HDFS.
/trans_elim/5/finished
1 → 0
2 → 0
3 → 0
4 → 0

If you would prefer to simply view each type of edge for each iteration in one place, we have a Google Spreadsheet for this example.

Also, a visual representation of this algorithm is available that starts from the state of the graph after the first iteration:

And there you have our approach to transitive elimination: an efficient and distributed method to separate a graph into components and propagate the minimum label across each component. If you’re interested in working on similar large scale distributed problems, check out our career opportunities.