Introducing MockRDD for testing PySpark code

Engineering

Learn from our challenges and triumphs as our talented engineering team offers insights for discussion and sharing.

Introducing MockRDD for testing PySpark code

Engineering

Summary

The LiveRamp Identity Data Science team is excited to share some of our PySpark testing infrastructure in the new open source library mockrdd. This contains the class MockRDD, which mirrors the behavior of PySpark RDD with several additions:

  • Extensive sanity checks to identify invalid inputs
  • More meaningful error messages for debugging issues
  • Straightforward to run within pdb
  • Removes Spark dependencies from development and testing environments
  • No Spark overhead when running through large test suites

Here’s a mini example of how to use MockRDD.

from mockrdd import MockRDD
def job(rdd):
    return rdd.map(lambda x: x*2).filter(lambda x: x>3)

assert job(MockRDD.empty()).collect() == []
assert job(MockRDD.of(1)).collect() == []
assert job(MockRDD.of(2)).collect() == [4]

 

We’ve found MockRDD to be quite helpful in developing and testing PySpark jobs and we’re happy to finally share it with the greater PySpark community. In the remainder of this blog post, I’ll walk you through some common uses of MockRDD and demonstrate how it provides additional functionality over using PySpark in local mode.

PySpark Background

Feel free to skip this section if you’re already familiar with PySpark. At a high-level, PySpark is a distributed data processing system based on performing idempotent operations on collections of values. The core component of PySpark is the Resilient Distributed Dataset (RDD), which represents a collection of values. An RDD can be created by reading data from distributed storage (e.g., the Hadoop Distributed File System, HDFS). You can perform an operation on an RDD to generate a new RDD that represents the results of that operation. E.g., you could map a function across all values in one RDD to generate a new RDD that contains the results of this function for each initial value in the original RDD.

The RDD is a high-level abstraction within PySpark that exist within a driver Python process for your PySpark job. Further, RDDs are lazily evaluated in that they aren’t computed until need be. E.g., applying a map function to one RDD immediately returns the new RDD and no computation is performed. You can requests the results of an RDD be persisted to distributed storage to force a computation. Additionally, if you know the results of your final RDD are small (e.g., the results of an aggregation), then you can load the results into the memory of the driver Python process using the method collect.

In unit testing PySpark jobs, one conventionally runs PySpark in local mode, whereby all computation happens on the same single machine running the driver process. The testing code creates test data and loads it into an RDD. One then applies their PySpark job operations on this test RDD, collects the results, and checks that the results are consistent with the expected results for the given test data. As discussed in the remainder of this blog post, we can improve upon running PySpark in local mode for testing using our new MockRDD library.

Examples of Extra Sanity Checks and Improved Error Messages

MockRDD is designed for testing and therefore adds a lot of checks to ensure inputs and operations are valid. These checks would pose too much of an overhead for inclusion in production PySpark jobs, processing massive amounts of data, but they’re cheap to run on small testing inputs.

To demonstrate how to use MockRDD, I consider example PySpark jobs based around processing made up log data. Each log entry has a server name, timestamp, remote IP address, and URI in CSV format. E.g.,

server0,1539015865,127.0.0.1,/index.html

 

A utility function parse_log_entry is defined as follows in util.py:

from datetime import datetime

def parse_log_entry(log):
    server_name, timestamp, remote_ip, uri = log.split(',')
    return dict(server_name=server_name,
                timestamp=datetime.fromtimestamp(int(timestamp)),
                remote_ip=remote_ip,
                uri=uri)

 

Example: Detecting Invalid Key/Value Pairs

For example consider the following PySpark job that incorrectly generates triplets instead of key/value pairs in a job that counts the number of unique values per key.

import typing
from pyspark import RDD

from util import parse_log_entry

def get_server_ip_uri(log_entry):
    return log_entry['server_name'], log_entry['remote_ip'], log_entry['uri']

def create_combiner(value) -> set:
    return {value}

def merge_value(values: set, value) -> set:
    values.add(value)
    return values

def merge_combiners(a: set, b: set) -> set:
    a.update(b)
    return a

def job(rdd: RDD) -> typing.List[typing.Tuple[str, int]]:
    return (rdd
            .map(parse_log_entry)
            .map(get_server_ip_uri)
            .combineByKey(create_combiner, merge_value, merge_combiners)
            .mapValues(len)
            .collect())

 

We could test this code using PySpark local mode as follows.

from pyspark import SparkContext

from invalid_key_value_pairs import job

logs = ['server0,1539015865,127.0.0.1,/index.html']

sc = SparkContext()
results = job(sc.parallelize(logs))
print(results)

 

This leads to the following exception in the Executor Python process.

Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
…, line 236, in mergeValues
    for k, v in iterator:
ValueError: too many values to unpack (expected 2)

 

Note, this is only part of a much longer traceback. We see that there were too many entries for a key/value pair, but that is all the info we receive.

Now let’s contrast this with testing using MockRDD.

from mockrdd import MockRDD

from invalid_key_value_pairs import job

logs = ['server0,1539015865,127.0.0.1,/index.html']

results = job(MockRDD.from_seq(logs))
print(results)

 

This results in the exception:

ValueError: (‘server0’, ‘127.0.0.1’, ‘/index.html’) is not a key/value pair

This gives us more info for debugging the specific issue in our code.

Example: Ensuring Operators are Callable

MockRDD also ensures that callables are passed in where expected. These checks occur during RDD construction to ensure we catch these issues as soon as possible. In contrast, in PySpark, passing in a non-callable will cause a runtime error when we try to call the object.

Consider this following buggy code.

from operator import itemgetter
from pyspark import RDD

from util import parse_log_entry

def count_distinct_servers(rdd: RDD) -> int:
    return (rdd
            .map(parse_log_entry)
            .map('server_name') # should be itemgetter('server_name')
            .distinct()
            .count())

 

Testing with the following PySpark code results in the following RuntimeError.

from pyspark import SparkContext

from invalid_callable import count_distinct_servers

logs = ['server0,1539015865,127.0.0.1,/index.html',
        'server0,1539015866,127.0.0.1,/index.html']

sc = SparkContext()
results = count_distinct_servers(sc.parallelize(logs))
print(results)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):

… line 236, in mergeValues

for k, v in iterator:

TypeError: ‘str’ object is not callable

 

Again, this is just a small part of the overall long traceback that occurs in running the code. From this stack trace, it can be difficult to track down where the invalid non-callable object was passed in.

The MockRDD equivalent is:

from mockrdd import MockRDD

from invalid_callable import count_distinct_servers

logs = ['server0,1539015865,127.0.0.1,/index.html',
        'server0,1539015866,127.0.0.1,/index.html']

results = count_distinct_servers(MockRDD.from_seq(logs))
print(results)

 

Which gives us the following error while constructing the RDD.

Traceback (most recent call last):
File “invalid_callable.py”, line 10, in count_distinct_servers
.map(‘server_name’) # should be itemgetter(‘server_name’)
TypeError: ‘server_name’ is not callable

 

The stack trace now includes the specific place in which we passed in the non-callable object.

Example: Ensuring Sequences are Provided in “Flat” Operations

Another type of sanity check concerns operations that are expected to return a sequence; e.g., RDD.flatMap. Consider the following case where we accidentally use flatMap instead of map.

from operator import itemgetter
from pyspark import RDD

from util import parse_log_entry

def count_distinct_timestamps(rdd: RDD) -> int:
    return (rdd
            .map(parse_log_entry)
            .flatMap(itemgetter('timestamp'))
            .distinct()
            .count())

 

Using similar testing code as the previous example, the PySpark error message is:

TypeError: ‘datetime.datetime’ object is not iterable

Whereas the MockRDD error message is:

TypeError: datetime.datetime(2018, 10, 8, 9, 24, 25) returned by operator.itemgetter(‘timestamp’) is not iterable

Again, the improved error messages make it easier for us to debug issues.

Running with PDB

Another benefit of MockRDD is that it’s straightforward to use within the Python debugger, pdb. This is not straightforward for PySpark in local mode, because the executor runs the code in a separate Python process.

python -m pdb mockrdd_test_invalid_key_value_pairs.py
> /Users/matt/ds/mockrdd/examples/mockrdd_test_invalid_key_value_pairs.py(1)<module>()
-> from mockrdd import MockRDD
(Pdb) c
Traceback (most recent call last):
ValueError: (‘server0’, ‘127.0.0.1’, ‘/index.html’) is not a key/value pair
Uncaught exception. Entering post mortem debugging
Running ‘cont’ or ‘step’ will restart the program
> /Users/matt/ds/mockrdd/mockrdd/__init__.py(61)unpack_key_value_pair()
-> raise ValueError(f”{entry} is not a key/value pair”)
(Pdb)

This allows you to interactively inspect your running code using breakpoints and catching exceptions.

Speed

There is a certain overhead with using PySpark, which can be significant when quickly iterating on unit tests or running a large test suite. In these examples, the PySpark local mode version takes approximately 5 seconds to run whereas the MockRDD one takes ~0.3 seconds.

Conclusions

We’ll hope you find MockRDD useful for testing your PySpark code. Let us know if you have any ideas for how we could improve upon MockRDD. Pull requests are certainly welcome.