Computing Distributions of Large Datasets with Cascading and the q-digest Algorithm

Engineering

Learn from our challenges and triumphs as our talented engineering team offers insights for discussion and sharing.

Computing Distributions of Large Datasets with Cascading and the q-digest Algorithm

Engineering

Distributions are a powerful tool for understanding datasets. As an example, imagine that you’re interested in quantifying user engagement for a new app you’re developing. To this end you compute the distribution of monthly engagement time for your users and discover the following trends:

Example distribution

You learn that most of your “users” rarely spend anytime using the app. These individuals are soon forgetting about the app after giving it a try. You also learn that you have a few thousand dedicated users who spend over 20 hours a month using your app. This gives you valuable information to guide your future development and marketing plans for this app.

This post gives a quick overview of how to compute such distributions for large datasets using Hadoop and Cascading. It also shows an efficient way to compute the quantiles of a distribution using the q-digest algorithm. Further, we’ll show how this algorithm can be leveraged with LiveRamp’s Combiner library to easily and efficiently compute the quantiles of many different distributions simultaneously (i.e. within a single MapReduce job).

To demonstrate these methods we’ll show examples of computing distributions from one of LiveRamp’s large datasets: our logs of web traffic.1 A single month of these logs include hundreds of billions of HTTP requests from over a billion unique browsers. We’re a little curious about the types of IP addresses on which we see these browsers. E.g. do we see primarily residential IP addresses with a small number of requests each or do we see large shared IP addresses (like universities) with a huge number of requests. We can answer this question by computing the distribution of requests per an IP address.

Computing a distribution with Cascading

We can compute this distribution using two Cascading CountBy assemblies: one to compute how many requests we see for each IP address and the second to compute how many IP addresses have n requests for each value of n. E.g. how many IP addresses had just one request, how many had two, … Here’s the Cascading code:

Pipe ips = new Each(logs, new Fields("log_entry"), new GetIpFromRequestLog(),  new Fields("ip"));
Pipe ipRequests = new CountBy(ips, new Fields("ip"), new Fields("ip_requests"));
Pipe dist = new CountBy(ipRequests, new Fields("ip_requests"), new Fields("count"));

 

Running this Cascading flow on one month of our web logs gives the following distribution:

IP Traffic Distribution

Plotted on a log-log scale we see a roughly power law distribution with most IP addresses having a small number of requests and a few IP addresses with a large number of requests. In other words most of the IP addresses that we see are residential IP addresses, but we still see a significant volume of traffic from shared IP addresses (e.g. universities or cell towers). This is seen even more clearly by transforming the distribution to its empirical cumulative probability distribution. (Accomplished by cumulatively summing the counts and normalizing.)

Cumulative IP Traffic Distribution

Here we can see that the majority of IP addresses (90%) have fewer than 300 requests and only a very small fraction (0.1%) have more than 20,000 requests. These points along the cummulative probability distribution are the quantiles of the distribution (e.g. what is the value of the distribution at the 90th and 99.9th quantiles, respectively) and they provide a concise summary of a distribution. These summary statistics are particularly valuable if we want to compare different distributions. For example if we want to analyze how the distribution changes over time it is simpler to just compare a few quantiles than to plot multiple distributions against each other. Therefore we’ll commonly just track the quantiles of distribution rather than tracking the whole distribution itself. The following plot highlights several quantiles of our example distribution “HTTP Requests per an IP Address” with the quantile values given in the subsequent table.

Quantiles and Cumulative Distribution

Quantile HTTP Requests per an IP Address, n
25% 3
50% 22
75% 95
90% 307
99% 2,153
99.9% 22,507
100% 116,433,791

 

These quantiles provide a great summary of the distribution, but we still had to compute the full distribution to determine them. Computing the distribution could be a moderately expensive Hadoop job if the distribution has many unique values. It would be great to use a more efficient approach to compute just the quantiles.

Computing quantiles with the q-digest algorithm

This can be accomplished with the q-digest algorithm.2 This algorithm approximates the full distribution by counting the occurrences of values within some range of values. E.g. as opposed to knowing that exactly 8,313 IP addresses had 1,572 requests in the last month we’ll know that 440,871IP addresses had between 1500 requests and 1550 requests. By only considering ranges of values we use less memory to represent the distribution, but lose some accuracy in determining the quantile values of the distribution. The q-digest algorithm adaptively chooses these ranges based on the specific data set that it is applied to and the configured level of approximation (referred to as compression factor). Additionally the q-digest algorithm is well suited for parallelization across multiple chunks of data with results combined at the end (as happens in MapReduce jobs).

To easily use the q-digest algorithm in Cascading flows, we’ve recently wrapped stream-lib’s q-digest implementation in LiveRamp’s open source Combiner library. Using this new  Combiner we can apply the q-digest algorithm to our example “HTTP Requests per an IP Address” investigation with the following code:

double compressionFactor = 1000.0;

Pipe logs = new Pipe("logs");
Pipe ips = new Each(logs, new Fields("log_entry"), new GetIpFromRequestLog(),  new Fields("ip"));
Pipe ipRequests = new CountBy(ips, new Fields("ip"), new Fields("ip_requests"));
Pipe qDigest = new Combiner(
    ipRequests,
    new QuantileExactAggregator(compressionFactor),
    Fields.NONE,
    new Fields("ip_requests"),
    new Fields("q_digest")
);

 

Applying this Cascading flow to our logs dataset gives us a QDigest object as its result and we can query this object to get the approximated quantile values of interest. The following table compares the results from the q-digest algorithm to those computed from the distribution.

Quantile HTTP Requests per an IP Address, n
From Distribution From q-digest
25% 3 5
50% 22 23
75% 95 97
90% 307 367
99% 2,153 4,095
99.9% 22,507 2,097,151
100% 116,433,791 134,217,727

 

They’re quite close for smaller quantiles, but a significant deviation is seen for the 99.9% quantile. This is due to the limited precision by which the q-digest algorithm can estimate quantile values. In this case the extreme values near 100% are affecting our estimate of 99.9% causing it to be biased upwards. Note that this doesn’t significantly impact our estimation of the maximum value.

The combiner approach is particularly powerful if we want to compute the quantiles of multiple distributions of a dataset simultaneously (i.e. in one MapReduce job). For example we might want to compute the “HTTP Requests per an IP Address” distribution from each of LiveRamp’s partners that send us traffic. Computing all of the the distributions using simple CountBys would be expensive and would further require a lot of post processing to compute a set of quantiles from each distribution. In contrast this is easily done with combiners:

Pipe ips = new Each(
       logs,
       new Fields("log_entry"), new GetIpFromRequestLog(),
       new Fields("log_entry", "ip")
);
ips = new Each(
       ips,
       new Fields("log_entry"), new GetPartnerFromRequestLog(),
       new Fields("ip", "partner")
);
Pipe ipRequests = new CountBy(ips, new Fields("ip", "partner"), new Fields("ip_requests"));
Pipe qDigest = new Combiner(
       ipRequests,
       new QuantileExactAggregator(),
       new Fields("partner"),
       new Fields("ip_requests"),
       new Fields("q_digest")
);

We’ve found this Combiner approach to quantile distribution computation incredibly useful for computing many of the key stats that are essential to the quality of LiveRamp’s products. We use this approach in several of our automatic QA stats workflows, each of which regularly compute the quantiles of up to 10,000 distributions in a single MapReduce job.

Endnotes

  1. This HTTP requests are part of LiveRamp’s integrations with online marketing platforms. For more details on this system see our Data Onboarding Overview and Server Side Distribution posts
  2. For more details on the q-digest algorithm see: Shrivastava, Nisheeth, et al. “Medians and beyond: new aggregation techniques for sensor networks.” ACM, 2004, http://info.prelert.com/blog/q-digest-an-algorithm-for-computing-approximate-quantiles-on-a-collection-of-integers, and http://marios.io/2011/07/31/q-digest/