At LiveRamp, most of our heavy data processing is done by MapReduce jobs on our Hadoop YARN cluster. Since these jobs are critical to our data processing workflows, one of our top priorities is making sure they run quickly and reliably.
In the early days of LiveRamp, our cluster would process on the order of dozens of jobs a day, and it was practical to manually monitor each one for performance issues. However, as the company has grown, the volume of jobs on our cluster has grown as well; we now average over 12,000 MapReduce jobs a day. Since the vast majority of jobs run automatically and aren’t human-supervised, it’s easy to not notice degraded performance until a job (a) actually fails, or (b) runs so slowly it threatens to miss deadlines for customers. We hate it when either of those happen, so our engineering team has built tools to detect MapReduce job problems before they become critical.
This post describes the tools Hadoop provides to help developers debug performance problems in MapReduce jobs, and how we were able to incorporate them into automatic alerts. While here we only discuss how to use memory and CPU related counters, they are also invaluable for tracking file size, namenode load, and much else.
One very valuable resource Hadoop provides for debugging are Counters. Tasks individually report counters for wide variety of statistics, and are aggregated per job by the ResourceManager. Counters can be viewed on the ResourceManager application page:
While the ResourceManager is a handy place to view counters when manually debugging, it doesn’t store job history long-term and isn’t suited to complex queries (plus, we’d like to avoid DDOSing our ResourceManager). Luckily, counters can easily be retrieved using either the Hadoop or Cascading APIs (the vast majority of our MapReduce jobs use the Cascading framework). The helper methods we use for counter retrieval are open source, part of our cascading_ext library.
We launch almost all our MapReduce jobs using an internal library we creatively call the “workflow framework.” By funneling all our MapReduce job creation through this tool, we are able to automatically fetch and store every counter for every job launched at LiveRamp. So for any job run on our cluster, we are able to fetch not just the counter value, but the history of that counter in previous runs:
This is great for manual debugging. But our original goal was not to react to job failures, but to actually pre-empt them. Luckily, all this tracking gave us a huge corpus of data to play with, and it turned out we could identify important categories of pending problems using just MapReduce counters.
Heap Usage vs Garbage Collect Time
Figuring out how much heap space a map or reduce task should be allocated is a dark art. On one hand, over-allocated memory means YARN does not pack as many tasks onto a NodeManager as it can, and the cluster is under-utilized. On the other hand, if you don’t give a task enough heap space, you get two Very Bad Problems:
- Explicit crashes due to Out Of Memory Errors (OOMEs). While explicit failure is usually recoverable, for time critical processes even a single failed job can be a problem. But worse is
- Garbage Collect (GC) time. The closer a Java application gets to full heap utilization, the more often the JVM has to run Full Garbage Collections, blocking any other work and using large amounts of CPU.
By default, Java does not fail an application until a whole 96% of time is spent in Garbage Collection. In other words, job tasks can be wasting vast amounts of time in GC, and without explicit monitoring, nobody will ever know.
Luckily, YARN counters have everything we need. Specifically:
- JobCounter.MILLIS_MAPS — wall time occupied by all map tasks
- JobCounter.MILLIS_REDUCES — wall-time occupied by all reduce tasks
- TaskCounter.GC_TIME_MILLIS — wall-time spent in garbage collection
Calculating % of time spent in GC is simple:
% of time spent in GC = GC_TIME_MILLIS / (MILLIS_MAPS + MILLIS_REDUCES)
Calculating how close tasks are to using all of their allocated memory is trickier, but we can still do it. Again, we can use JobCounter.MILLI_MAPS + JobCounter.MILLI_REDUCES, plus
- JobCounter.TOTAL_LAUNCHED_MAPS/REDUCES — total launched map / reduce tasks
- JobCounter.MB_MILLIS_MAPS/REDUCES — MB*ms of RAM allocated to map / reduce tasks
- TaskCounter.PHYSICAL_MEMORY_BYTES — Bytes of ram used by all tasks (peak per task)
totalTasks = TOTAL_LAUNCHED_MAPS + TOTAL_LAUNCHED_REDUCES averageTaskMemory = PHYSICAL_MEMORY_BYTES / (totalTasks * 1024*1024) occupiedMemory = (MILLIS_MAPS + MILLIS_REDUCES) * averageTaskMemory allocatedMemory = (MB_MILLIS_MAPS + MB_MILLIS_REDUCES)
% of memory allocation used = occupiedMemory / allocatedMemory
Using the historic data we collected, we were able to graph % memory usage vs % of time spent in garbage collection over a day of jobs, shown below:
This was reassuring, and matched our expectations. Though not every job which had high memory usage ended up with bad GC, as we approach peak memory, the risk of high GC clearly increases. We decided to set up two automatic alerts:
- 20% GC Time — when a job’s tasks spend on average > 20% of their time in GC, we alert the responsible team
- 90% Memory Usage — when the tasks of a job on average use > 90% of their allocated memory, we send an alert
These have let us identify and fix a number of performance issues which we would not have otherwise noticed.
CPU allocation in MapReduce jobs is tricky. YARN’s scheduling allows users to allocate resources to a YARN application along two dimensions — memory and CPU. While memory limits are strongly enforced by the NodeManager, CPU limits (allocated by property mapreduce.map.cpu.vcores) are (surprise!) not enforced by default.
While this does make good sense in terms of maximizing total CPU utilization, we were bitten by this when a rogue application started Garbage Collecting heavily (this was before we had the memory alerts discussed above). Since Java GC is heavily multi-threaded, tasks which were only “allocated” 1 core ended up using 10+ cores to constantly GC, degrading the performance of every other application on our cluster.
To prevent this from happening again, we set up an alert to check for jobs which use significantly more CPU than they requested from YARN. We can do this with JobCounter.VCORES_MILLIS_MAPS/REDUCES plus
- TaskCounter.CPU_MILLISECONDS — total CPU time used by a task
We just want used CPU / allocated CPU, or
CPU_MILLISECONDS / (JobCounter.VCORES_MILLIS_MAPS+JobCounter.VCORES_MILLIS_REDUCES)
Using a day of jobs as data, we charted this to see the distribution:
It turned out that some jobs were using > 5x the CPU they were allocated, degrading the performance of other tasks on those machines. Going forward, we send alerts when jobs use > 2x the CPU they request from YARN.
The counters described above give us one other fun tool — by tracking VCORES_MILLIS_MAPS, VCORES_MILLIS_REDUCES, MB_MILLIS_MAPS and MB_MILLIS_REDUCES (basically, “CPU time” and “memory time” used by a job), plus knowing what we spend to maintain our cluster, we can calculate the dollar cost of every run of a data workflow:
This helps us two ways
- It gives our product team a way to judge whether a poorly-optimized job is worth running (at a cost to everything else)
- To give us a snapshot of whether a job is not scaling well as LiveRamp grows.
This also lets us quickly ask “what in the world is using all of our cluster capacity?” by showing the % of total cluster capacity each workflow uses:
While the tools Hadoop provides to help developers are often arcane, undocumented, and confusing, they can be very valuable resources when trying to optimize and debug problems. These are just a few of the systems we’ve built to help us as we constantly scale up the amount of data we process.
Although most of the tools described here aren’t open source yet, we’re hoping to push many of them to our open source repos in the coming months. But if you don’t want to wait that long, LiveRamp engineering is always hiring.