Garbage in, Never out

Mercury's engineering prides itself on keeping infrastructure as simple as possible, for as long as possible. One particular system we've developed is called Postgresque, a queuing system that leverages PostgreSQL which we use to process asynchronous work. From this queuing system, we have a set of workers that pull jobs from the queue and process them in a timely fashion. On the whole, this has worked well for our needs– Traditional banking industry infrastructure, runs relatively slowly, so we're optimizing for stability and fewer moving parts rather than maximum throughput.

That's not to say that throughput doesn't matter– as Mercury's traffic and number of processed payments increased, we recently noticed ongoing slowdowns in our worker processes the longer they ran. Many engineers consider using PostgreSQL as a queuing mechanism to be an antipattern, or at the very least, a footgun. I'll confess that as we observed our workers crawl to a halt periodically, I suspected the same thing. However, we spent some time optimizing the SQL queries and minimizing queue contention, and in the end we still continued to observe ongoing slowdowns.

One peculiarity of the gradual slowdowns that we noticed was that the workers would process jobs significantly faster after a fresh deployment. As a stopgap, we set up periodic restarts of the workers to keep asyncronous task throughput under control, but this also pointed a rather different category of problem: memory leaks 😱

For context: Haskell's compiler, GHC, uses a mark and compact garbage collection algorithm by default. The Haskell garbage collector is implemented in a stop-the-world fashion: user threads are paused by the GHC runtime system while the garbage collector scans the heap. In order to perform garbage collection, the runtime system must scan all live data in the heap. Thus, as the size of the heap grows, the slower the Haskell process is able to run due to more time spent perform garbage collection. Memory leaks in a long-running Haskell not only use RAM, but they slow down user code more and more as time goes by.1

Long-lived leaks tend to be an issue that new Haskell programmers run into as they learn the ins and outs of lazy functional programming, but for web application development, we don't tend to run into these sorts of issues all that often due to the request/response lifecycle of HTTP servers.

Thus began a fascinating dive into finding and fixing memory consumption issues, both in our own code, and in the Haskell larger ecosystem.

Understand the semantics of the libraries you use

For our worker processes, the code we execute to boot and process our queued jobs looked something like this2:

launchWorkerThreads :: ShutdownHandler -> ResourceT (ReaderT App IO) [ThreadId]
launchWorkerThreads shutdownHandler = forM [1 .. numWorkers] $ \_ ->
  resourceForkIO $ do
    forever $ do
      shutdownIfNecessary shutdownHandler
      maybeJob <- runDB tryDequeueJob
      case maybeJob of
        Nothing -> exponentialBackoff
        Just job -> processJob job

main = do
  app <- initializeApp
  handler <- createShutdownHandler
  let runApp m = runReaderT m app
  tIds <- runApp $ runResourceT $ launchWorkerThreads handler
  awaitShutdownSignal
  sendWorkerShutdown handler

On first glance, this code might not seem immediately problematic, but there are several subtle issues hiding here.

forever sometimes causes space leaks

As of base-4.9.0.0, the forever function's implementation switched from a Monad constraint to Applicative. Due to the way that Applicative and Functor provide default implementations, each iteration for Applicative instances that don't define either <$ or *> to replace the default definition build up additional redundant thunks on each iteration3. This issue has been fixed for a number of the common Applicative instances in base, transformers, and mtl (transitively from transformers), but it's worth looking out for this issue if you use forever in tandem with other Applicative instances, as they may not define <$ or *>.

We got a fix in for ResourceT as of resourcet-1.2.4.3 for this, but in the end, this didn't fully fix the problem. Memory continued to leak, so we took another look.

In reality, computations in the ResourceT monad are meant to exit in a timely fashion, so the real answer was to not use forever with ResourceT in the first place.

The patient says, 'Doctor, it hurts when I do this.'

The doctor says, 'Then don't do that!'

― Henny Youngman

The first involves requires understanding the basic semantics of ResourceT:

Simply put, ResourceT is a monad transformer which creates a region of code where you can safely allocate resources. Internally, it's represented as a mutable map of resource keys to cleanup functions provided for allocated resources.

It provides a few basic operations:

allocate lets you allocate a resource, and you can either release it when you're done using the resource, or else the resource will be released automatically when a region evaluated with runResourceT and all of its calls to resourceForkIO complete.

allocate
  :: MonadResource m
  => IO a         -- allocate a resource
  -> (a -> IO ())	-- free resource
  -> m (ReleaseKey, a)

release
  :: MonadIO m
  => ReleaseKey -- Use the returned resource key from allocate
                -- to clean up the associated resource early.
  -> m ()

resourceForkIO is used to share the internal resource management map safely across forked threads to ensure that a resource isn't freed until all threads have exited.

Going back to the leaky worker implementation, can you see the problem?

The issue is that every worker is sharing the same resource map! Since calling release on resources is optional in a ResourceT region, any code run in processJob that allocates resources might not free them until all worker threads exit, which they won't do until the program execution is complete.

Luckily, the fix to the original code is relatively simple! The key is that we need to perform the runResourceT call within each round of the forever loop. This ensures that resources used in each call to processJob are promptly finalized rather than waiting for the infinite loop to exit.

launchWorkerThreads :: ShutdownHandler -> ReaderT App IO [ThreadId]
launchWorkerThreads shutdownHandler = forM [1 .. numWorkers] $ \_ ->
  forkIO $ do
    forever $ do
      shutdownIfNecessary shutdownHandler
      job <- runDB tryDequeueJob
      case maybeJob of
        Nothing -> exponentialBackoff
        Just job -> processJob job

main = do
  app <- initializeApp
  handler <- createShutdownHandler
  let runApp m = runReaderT m app
  tIds <- runApp $ launchWorkerThreads handler
  awaitShutdownSignal
  sendWorkerShutdown handler

Therefore the Master

acts without doing anything

and teaches without saying anything.

Things arise and she lets them come;

things disappear and she lets them go.

She has but doesn't possess,

acts but doesn't expect.

When her work is done, she forgets it.

That is why it lasts forever.

― Tao Te Ching

Fixing this issue, however, only slowed our memory leak situation. We needed to dig deeper to figure out what else wasn't working.

When helping hurts

Starting with this context as a jumping off point, we profiled running 100,000 no-op tasks to see what was still eating memory:

Adversarial input heap profile

Further digging into heap profiling cost centers (not shown here) surfaced a curious culprit... our metrics library! In an unfortunate turn of events, the instrumentation in place to ensure that our queuing sytems behaved as they were supposed to turned out to be the code that leaked memory.4

So where did all the memory go? For tracking metrics and running alerting at Mercury, we use Prometheus and Grafana, sending data via the prometheus-client library. One of the primary metric types it offers is summaries, which emits multiple time series values commonly needed for application monitoring:

  • Total sum all observed values
  • Count of observed events
  • Streaming percentiles

Sum and count of observed values are fairly straightforward metrics to track, but what is a streaming percentile?

Streaming percentiles algorithms are approximations of percentile ranks, and are frequently used to track measurements such response latency for network requests in networked applications (among other things).

Latency is typically calculated in the 50th, 90th, 95th, and 99th percentiles, commonly referred to as p50, p90, p95, and p99. Imagine 10 latency measurements: 1, 2, 5, 5, 18, 25, 33, 36, 122, and 1000 milliseconds (ms). A p50 measurement represents the median performance of the system. In this case, the p50 measurement is 18 ms, meaning 50% of users experienced that latency or less. The p90 measurement is 122, meaning that 9 of the 10 latencies measured less than 122. Tracking outliers is useful as a leading indicator of system degradation as well as users experience really slow interactions on our website, so it's something we want to keep a close eye on.5

Traditional percentile rank calculation requires us to track every single observed value in order to calculate e.g. the p95 of the observed values. In a long-running process, however, we can't afford to retain all measurements for the lifetime of the program! We need some way of discarding most of the observed values that we receive while still returning reasonably accurate percentile values.

The existing prometheus client libraries in most languages tend to implement an algorithm often referred to as the CKMS Streaming Quantile algorithm6, and the prometheus-client Haskell library prior to our changes was no exception. The original CKMS paper provides multiple variants, the one typically used for Prometheus clients is the "targeted quantiles" variant which allows consumers of the algorithm to specify the acceptable error rate for certain percentiles.

What counts as an "adversarial input"? We discovered that observing the same values repeatedly, or observing monotonically increasing/decreasing values led to linear memory usage:

filledEstimator = foldr insert emptyEstimator $ replicate 100_000 1
length $ estItems $ compress filledEstimator

Returns:

100000

An Estimator is (was) the internal data structure used by prometheus-client to track the observed data points. In the above code, we observe the value 1 100,000 times. We then calculate the length of the retained items after we do a compress, which is the operations provided by the CKMS algorithm to discard some values from the retained observation set. This should eliminate any unecessary values while providing us with approximately correct histogram information, but the invariants that the compressed data set needs to maintain prevent the algorithm from dropping any of the values in this case. This led to our surprising space leak in production.

We didn't know enough about the algorithm to fully understand whether the leak in question was a bug particular to the prometheus-client implementation of CKMS targeted quantiles. In order to find out, we put implemented the same adversarial input case (repeated sampling of the value 1), and found that numerous other language implementations suffered from the same problem.

Ultimately we discovered due to a footnote in a Rust implementation that the targeted quantile algorithm itself fundamentally didn't work, so we decided to pursue more recent research from the authors of the CKMS algorithm. One of the authors, Graham Cormode, kindly provided us with a link to the latest work in this line7, and thankfully it also had publicly available code under the Apache foundation as part of the DataSketches project. In the new implementation, a summary is referred to as a ReqSketch, which stands for "Relative Error Quantile Sketch". We are excited to announce that we ported over the ReqSketch implementation from Java to Haskell and have it published as a package on Hackage8.

Benchmarks in the general case for the new implementation (called ReqSketch) dramatic performance increases (~257x) over the existing Prometheus code, and happily, provide significantly better memory usage on unbounded observation streams.

We also ran tests to ensure sublinear growth, and in the adversarial case of repeated inserts of the same value, we are seeing the desired behaviour for the new implementation:

sk <- mkReqSketch 6 HighRanksAreAccurate
replicateM_ 100_000_000 (insert sk 1)
print =<< getRetainedItems

Returns:

937

Notice that this time we inserted 100 million items, rather than 100 thousand as in the first example!

Even better, this work has been upstreamed into the prometheus-client library as of 1.1.0, so if you are a consumer of the prometheus-client package, we highly recommend upgrading to the latest version.

Comparing our workers locally running the new algorithm, we're seeing consistent garbage collection and memory usage when processing webhooks. This is the same 100,000 webhook workload as the graph posted before– the only difference is the usage of the new streaming percentile algorithm!

Fixed heap profile

Miscellaneous takeaways

  • If you run any sort of long-running Haskell systems in production, make sure you track RTS metrics in some sort of monitoring system. It would have taken ages longer to get to the root of the issue if we couldn't examine our processes' heap size and garbage collection in realtime.
  • Researchers are really nice and want to share their work with you. I'd never emailed a researcher about their work before, but Graham really helped us to crack the case on a problem for which we didn't have much domain expertise.
  • If you liked this post, why not come work at Mercury?

Footnotes

  1. Channable has a great writeup on Haskell's garbage collection mechanism.

  2. Code changed to protect the innocent.

  3. You have to manually expand out a few rounds of loops to really see the issue, exercise left to the reader and all that.

  4. https://www.youtube.com/watch?v=8JOpPNra4bw

  5. How NOT to measure latency

  6. Effective Computation of Biased Quantiles over Data Streams

  7. Relative Error Streaming Quantiles

  8. data-sketches Hackage package