Mercury's engineering prides itself on keeping infrastructure as simple as possible, for as long as possible. One particular system we've developed is called Postgresque, a queuing system that leverages PostgreSQL which we use to process asynchronous work. From this queuing system, we have a set of workers that pull jobs from the queue and process them in a timely fashion. On the whole, this has worked well for our needs– Traditional banking industry infrastructure, runs relatively slowly, so we're optimizing for stability and fewer moving parts rather than maximum throughput.
That's not to say that throughput doesn't matter– as Mercury's traffic and number of processed payments increased, we recently noticed ongoing slowdowns in our worker processes the longer they ran. Many engineers consider using PostgreSQL as a queuing mechanism to be an antipattern, or at the very least, a footgun. I'll confess that as we observed our workers crawl to a halt periodically, I suspected the same thing. However, we spent some time optimizing the SQL queries and minimizing queue contention, and in the end we still continued to observe ongoing slowdowns.
One peculiarity of the gradual slowdowns that we noticed was that the workers would process jobs significantly faster after a fresh deployment. As a stopgap, we set up periodic restarts of the workers to keep asyncronous task throughput under control, but this also pointed a rather different category of problem: memory leaks 😱
For context: Haskell's compiler, GHC, uses a mark and compact garbage collection algorithm by default. The Haskell garbage collector is implemented in a stop-the-world fashion: user threads are paused by the GHC runtime system while the garbage collector scans the heap. In order to perform garbage collection, the runtime system must scan all live data in the heap. Thus, as the size of the heap grows, the slower the Haskell process is able to run due to more time spent perform garbage collection. Memory leaks in a long-running Haskell not only use RAM, but they slow down user code more and more as time goes by.1
Long-lived leaks tend to be an issue that new Haskell programmers run into as they learn the ins and outs of lazy functional programming, but for web application development, we don't tend to run into these sorts of issues all that often due to the request/response lifecycle of HTTP servers.
Thus began a fascinating dive into finding and fixing memory consumption issues, both in our own code, and in the Haskell larger ecosystem.
Understand the semantics of the libraries you use
For our worker processes, the code we execute to boot and process our queued jobs looked something like this2:
launchWorkerThreads :: ShutdownHandler -> ResourceT (ReaderT App IO) [ThreadId] launchWorkerThreads shutdownHandler = forM [1 .. numWorkers] $ \_ -> resourceForkIO $ do forever $ do shutdownIfNecessary shutdownHandler maybeJob <- runDB tryDequeueJob case maybeJob of Nothing -> exponentialBackoff Just job -> processJob job main = do app <- initializeApp handler <- createShutdownHandler let runApp m = runReaderT m app tIds <- runApp $ runResourceT $ launchWorkerThreads handler awaitShutdownSignal sendWorkerShutdown handler
On first glance, this code might not seem immediately problematic, but there are several subtle issues hiding here.
forever sometimes causes space leaks
forever function's implementation switched from a
Monad constraint to
Applicative. Due to the way that
Functor provide default implementations, each iteration for
Applicative instances that don't define either
*> to replace the default definition build up additional redundant thunks on each iteration3. This issue has been fixed for
a number of the common
Applicative instances in
mtl (transitively from
transformers), but it's worth
looking out for this issue if you use
forever in tandem with other
Applicative instances, as they may not define
We got a fix in for
ResourceT as of
resourcet-184.108.40.206 for this, but in the end, this didn't fully fix the problem. Memory continued to leak, so we took another look.
In reality, computations in the
ResourceT monad are meant to exit in a timely fashion, so the real answer was to not use
ResourceT in the
― Henny Youngman
The patient says, 'Doctor, it hurts when I do this.'
The doctor says, 'Then don't do that!'
The first involves requires understanding the basic semantics of
ResourceT is a monad transformer which creates a region of code where you can safely allocate resources. Internally, it's represented as a mutable map of resource keys to cleanup functions
provided for allocated resources.
It provides a few basic operations:
allocate lets you allocate a resource, and you can either release it when you're done using the resource, or else the resource
will be released automatically when a region evaluated with
runResourceT and all of its calls to
allocate :: MonadResource m => IO a -- allocate a resource -> (a -> IO ()) -- free resource -> m (ReleaseKey, a) release :: MonadIO m => ReleaseKey -- Use the returned resource key from allocate -- to clean up the associated resource early. -> m ()
resourceForkIO is used to share the internal resource management map safely across forked threads
to ensure that a resource isn't freed until all threads have exited.
Going back to the leaky worker implementation, can you see the problem?
The issue is that every worker is sharing the same resource map! Since calling
release on resources
is optional in a
ResourceT region, any code run in
processJob that allocates resources might not
free them until all worker threads exit, which they won't do until the program execution is complete.
Luckily, the fix to the original code is relatively simple! The key is that we need to perform
runResourceT call within each round of the
forever loop. This ensures that resources used in each call to
processJob are promptly finalized rather than waiting for the infinite loop to exit.
launchWorkerThreads :: ShutdownHandler -> ReaderT App IO [ThreadId] launchWorkerThreads shutdownHandler = forM [1 .. numWorkers] $ \_ -> forkIO $ do forever $ do shutdownIfNecessary shutdownHandler job <- runDB tryDequeueJob case maybeJob of Nothing -> exponentialBackoff Just job -> processJob job main = do app <- initializeApp handler <- createShutdownHandler let runApp m = runReaderT m app tIds <- runApp $ launchWorkerThreads handler awaitShutdownSignal sendWorkerShutdown handler
― Tao Te Ching
Therefore the Master
acts without doing anything
and teaches without saying anything.
Things arise and she lets them come;
things disappear and she lets them go.
She has but doesn't possess,
acts but doesn't expect.
When her work is done, she forgets it.
That is why it lasts forever.
Fixing this issue, however, only slowed our memory leak situation. We needed to dig deeper to figure out what else wasn't working.
When helping hurts
Starting with this context as a jumping off point, we profiled running 100,000 no-op tasks to see what was still eating memory:
Further digging into heap profiling cost centers (not shown here) surfaced a curious culprit... our metrics library! In an unfortunate turn of events, the instrumentation in place to ensure that our queuing sytems behaved as they were supposed to turned out to be the code that leaked memory.4
So where did all the memory go? For tracking metrics and running alerting at Mercury, we use Prometheus and Grafana, sending data via the
prometheus-client library. One of the primary metric
types it offers is summaries, which emits multiple time series values commonly needed for application monitoring:
- Total sum all observed values
- Count of observed events
- Streaming percentiles
Sum and count of observed values are fairly straightforward metrics to track, but what is a streaming percentile?
Streaming percentiles algorithms are approximations of percentile ranks, and are frequently used to track measurements such response latency for network requests in networked applications (among other things).
Latency is typically calculated in the 50th, 90th, 95th, and 99th percentiles, commonly referred to as p50, p90, p95, and p99. Imagine 10 latency measurements: 1, 2, 5, 5, 18, 25, 33, 36, 122, and 1000 milliseconds (ms). A p50 measurement represents the median performance of the system. In this case, the p50 measurement is 18 ms, meaning 50% of users experienced that latency or less. The p90 measurement is 122, meaning that 9 of the 10 latencies measured less than 122. Tracking outliers is useful as a leading indicator of system degradation as well as users experience really slow interactions on our website, so it's something we want to keep a close eye on.5
Traditional percentile rank calculation requires us to track every single observed value in order to calculate e.g. the p95 of the observed values. In a long-running process, however, we can't afford to retain all measurements for the lifetime of the program! We need some way of discarding most of the observed values that we receive while still returning reasonably accurate percentile values.
The existing prometheus client libraries in most languages tend to implement an algorithm often referred to as the CKMS Streaming Quantile algorithm6,
prometheus-client Haskell library prior to our changes was no exception. The original CKMS paper provides multiple variants,
the one typically used for Prometheus clients is the "targeted quantiles" variant which allows consumers of the algorithm to specify the acceptable error rate for certain percentiles.
What counts as an "adversarial input"? We discovered that observing the same values repeatedly, or observing monotonically increasing/decreasing values led to linear memory usage:
filledEstimator = foldr insert emptyEstimator $ replicate 100_000 1 length $ estItems $ compress filledEstimator
Estimator is (was) the internal data structure used by
prometheus-client to track the observed data points. In the above code, we observe the value
1 100,000 times.
We then calculate the length of the retained items after we do a
compress, which is the operations
provided by the CKMS algorithm to discard some values from the retained observation set. This should eliminate any unecessary values while providing us with approximately correct histogram information, but the invariants that the compressed data set needs to maintain prevent the algorithm from dropping any of the values in this case. This led to our surprising space leak in production.
We didn't know enough about the algorithm to fully understand whether the leak in question
was a bug particular to the
prometheus-client implementation of CKMS targeted quantiles.
In order to find out, we put implemented the same adversarial input case (repeated sampling of the value
1), and found that numerous other language implementations suffered from the same problem.
Ultimately we discovered due to a footnote in a Rust implementation that the targeted quantile algorithm itself fundamentally didn't work, so we decided to pursue more recent research from
the authors of the CKMS algorithm. One of the authors, Graham Cormode, kindly provided us with a link to the latest work in this line7, and thankfully it also had publicly available code under the Apache foundation as part of the DataSketches project. In the new implementation, a summary is referred to as a ReqSketch, which stands for "Relative Error Quantile Sketch". We are excited to announce that we ported over the
ReqSketch implementation from Java to Haskell and have it published as a package on Hackage8.
Benchmarks in the general case for the new implementation (called
ReqSketch) dramatic performance increases (~257x) over the existing Prometheus code, and happily, provide significantly better
memory usage on unbounded observation streams.
We also ran tests to ensure sublinear growth, and in the adversarial case of repeated inserts of the same value, we are seeing the desired behaviour for the new implementation:
sk <- mkReqSketch 6 HighRanksAreAccurate replicateM_ 100_000_000 (insert sk 1) print =<< getRetainedItems
Notice that this time we inserted 100 million items, rather than 100 thousand as in the first example!
Even better, this work has been upstreamed into the
prometheus-client library as of
if you are a consumer of the
prometheus-client package, we highly recommend upgrading to the latest
Comparing our workers locally running the new algorithm, we're seeing consistent garbage collection and memory usage when processing webhooks. This is the same 100,000 webhook workload as the graph posted before– the only difference is the usage of the new streaming percentile algorithm!
- If you run any sort of long-running Haskell systems in production, make sure you track RTS metrics in some sort of monitoring system. It would have taken ages longer to get to the root of the issue if we couldn't examine our processes' heap size and garbage collection in realtime.
- Researchers are really nice and want to share their work with you. I'd never emailed a researcher about their work before, but Graham really helped us to crack the case on a problem for which we didn't have much domain expertise.
- If you liked this post, why not come work at Mercury?
Code changed to protect the innocent. ↩
You have to manually expand out a few rounds of loops to really see the issue, exercise left to the reader and all that.
- GHC issue
- Mailing list comments discussing the issue: