Introducing “Delayed”: Resilient Background Jobs on Rails
Introducing “Delayed”: Resilient Background Jobs on Rails Sep 7, 2021 12:00:00 AM In the past 24 hours, a Ruby on Rails application at Betterment performed somewhere on the order of 10 million asynchronous tasks. While many of these tasks merely sent a transactional email, or fired off an iOS or Android push notification, plenty involved the actual movement of money—deposits, withdrawals, transfers, rollovers, you name it—while others kept Betterment’s information systems up-to-date—syncing customers’ linked account information, logging events to downstream data consumers, the list goes on. What all of these tasks had in common (aside from being, well, really important to our business) is that they were executed via a database-backed job-execution framework called Delayed, a newly-open-sourced library that we’re excited to announce… right now, as part of this blog post! And, yes, you heard that right. We run millions of these so-called “background jobs” daily using a SQL-backed queue—not Redis, or RabbitMQ, or Kafka, or, um, you get the point—and we’ve very intentionally made this choice, for reasons that will soon be explained! But first, let’s back up a little and answer a few basic questions. Why Background Jobs? In other words, what purpose do these background jobs serve? And how does running millions of them per day help us? Well, when building web applications, we (as web application developers) strive to build pages that respond quickly and reliably to web requests. One might say that this is the primary goal of any webapp—to provide a set of HTTP endpoints that reliably handle all the success and failure cases within a specified amount of time, and that don’t topple over under high-traffic conditions. This is made possible, at least in part, by the ability to perform units of work asynchronously. In our case, via background jobs. At Betterment, we rely on said jobs extensively, to limit the amount of work performed during the “critical path” of each web request, and also to perform scheduled tasks at regular intervals. Our reliance on background jobs even allows us to guarantee the eventual consistency of our distributed systems, but more on that later. First, let’s take a look at the underlying framework we use for enqueuing and executing said jobs. Frameworks Galore! And, boy howdy, are there plenty of available frameworks for doing this kind of thing! Ruby on Rails developers have the choice of resque, sidekiq, que, good_job, delayed_job, and now... delayed, Betterment’s own flavor of job queue! Thankfully, Rails provides an abstraction layer on top of these, in the form of the Active Job framework. This, in theory, means that all jobs can be written in more or less the same way, regardless of the job-execution backend. Write some jobs, pick a queue backend with a few desirable features (priorities, queues, etc), run some job worker processes, and we’re off to the races! Sounds simple enough! Unfortunately, if it were so simple we wouldn’t be here, several paragraphs into a blog post on the topic. In practice, deciding on a job queue is more complicated than that. Quite a bit more complicated, because each backend framework provides its own set of trade-offs and guarantees, many of which will have far-reaching implications in our codebase. So we’ll need to consider carefully! How To Choose A Job Framework The delayed rubygem is a fork of both delayed_job and delayed_job activerecord, with several targeted changes and additions, including numerous performance & scalability optimizations that we’ll cover towards the end of this post. But first, in order to explain how Betterment arrived where we did, we must explain what it is that we need our job queue to be capable of, starting with the jobs themselves. You see, a background job essentially represents a tiny contract. Each consists of some action being taken for / by / on behalf of / in the interest of one or more of our customers, and that must be completed within an appropriate amount of time. Betterment’s engineers decided, therefore, that it was critical to our mission that we be capable of handling each and every contract as reliably as possible. In other words, every job we attempt to enqueue must, eventually, reach some form of resolution. Of course, job “resolution” doesn’t necessarily mean success. Plenty of jobs may complete in failure, or simply fail to complete, and may require some form of automated or manual intervention. But the point is that jobs are never simply dropped, or silently deleted, or lost to the cyber-aether, at any point, from the moment we enqueue them to their eventual resolution. This general property—the ability to enqueue jobs safely and ensure their eventual resolution—is the core feature that we have optimized for. Let’s call it resilience. Optimizing For Resilience Now, you might be thinking, shouldn’t all of these ActiveJob backends be, at the very least, safe to use? Isn’t “resilience” a basic feature of every backend, except maybe the test/development ones? And, yeah, it’s a fair question. As the author of this post, my tactful attempt at an answer is that, well, not all queue backends optimize for the specific kind of end-to-end resilience that we look for. Namely, the guarantee of at-least-once execution. Granted, having “exactly-once” semantics would be preferable, but if we cannot be sure that our jobs run at least once, then we must ask ourselves: how would we know if something didn’t run at all? What kind of monitoring would be necessary to detect such a failure, across all the features of our app, and all the types of jobs it might try to run? These questions open up an entirely different can of worms, one that we would prefer remained firmly sealed. Remember, jobs are contracts. A web request was made, code was executed, and by enqueuing a job, we said we'd eventually do something. Not doing it would be... bad. Not even knowing we didn't do it... very bad. So, at the very least, we need the guarantee of at-least-once execution. Building on at-least-once guarantees If we know for sure that we’ll fully execute all jobs at least once, then we can write our jobs in such a way that makes the at-least-once approach reliable and resilient to failure. Specifically, we’ll want to make our jobs idempotent—basically, safely retryable, or resumable—and that is on us as application developers to ensure on a case-by-case basis. Once we solve this very solvable idempotency problem, then we’re on track for the same net result as an “exactly-once” approach, even if it takes a couple extra attempts to get there. Furthermore, this combination of at-least-once execution and idempotency can then be used in a distributed systems context, to ensure the eventual consistency of changes across multiple apps and databases. Whenever a change occurs in one system, we can enqueue idempotent jobs notifying the other systems, and retry them until they succeed, or until we are left with stuck jobs that must be addressed operationally. We still concern ourselves with other distributed systems pitfalls like event ordering, but we don’t have to worry about messages or events disappearing without a trace due to infrastructure blips. So, suffice it to say, at-least-once semantics are crucial in more ways than one, and not all ActiveJob backends provide them. Redis-based queues, for example, can only be as durable (the “D” in “ACID”) as the underlying datastore, and most Redis deployments intentionally trade-off some durability for speed and availability. Plus, even when running in the most durable mode, Redis-based ActiveJob backends tend to dequeue jobs before they are executed, meaning that if a worker process crashes at the wrong moment, or is terminated during a code deployment, the job is lost. These frameworks have recently begun to move away from this LPOP-based approach, in favor of using RPOPLPUSH (to atomically move jobs to a queue that can then be monitored for orphaned jobs), but outside of Sidekiq Pro, this strategy doesn’t yet seem to be broadly available. And these job execution guarantees aren’t the only area where a background queue might fail to be resilient. Another big resilience failure happens far earlier, during the enqueue step. Enqueues and Transactions See, there’s a major “gotcha” that may not be obvious from the list of ActiveJob backends. Specifically, it’s that some queues rely on an app’s primary database connection—they are “database-backed,” against the app’s own database—whereas others rely on a separate datastore, like Redis. And therein lies the rub, because whether or not our job queue is colocated with our application data will greatly inform the way that we write any job-adjacent code. More precisely, when we make use of database transactions (which, when we use ActiveRecord, we assuredly do whether we realize it or not), a database-backed queue will ensure that enqueued jobs will either commit or roll back with the rest of our ActiveRecord-based changes. This is extremely convenient, to say the least, since most jobs are enqueued as part of operations that persist other changes to our database, and we can in turn rely on the all-or-nothing nature of transactions to ensure that neither the job nor the data mutation is persisted without the other. Meanwhile, if our queue existed in a separate datastore, our enqueues will be completely unaware of the transaction, and we’d run the risk of enqueuing a job that acts on data that was never committed, or (even worse) we’d fail to enqueue a job even when the rest of the transactional data was committed. This would fundamentally undermine our at-least-once execution guarantees! We already use ACID-compliant datastores to solve these precise kinds of data persistence issues, so with the exception of really, really high volume operations (where a lot of noise and data loss can—or must—be tolerated), there’s really no reason not to enqueue jobs co-transactionally with other data changes. And this is precisely why, at Betterment, we start each application off with a database-backed queue, co-located with the rest of the app’s data, with the guarantee of at-least-once job execution. By the way, this is a topic I could talk about endlessly, so I’ll leave it there for now. If you’re interested in hearing me say even more about resilient data persistence and job execution, feel free to check out Can I break this?, a talk I gave at RailsConf 2021! But in addition to the resiliency guarantees outlined above, we’ve also given a lot of attention to the operability and the scalability of our queue. Let’s cover operability first. Maintaining a Queue in the Long Run Operating a queue means being able to respond to errors and recover from failures, and also being generally able to tell when things are falling behind. (Essentially, it means keeping our on-call engineers happy.) We do this in two ways: with dashboards, and with alerts. Our dashboards come in a few parts. Firstly, we host a private fork of delayedjobweb, a web UI that allows us to see the state of our queues in real time and drill down to specific jobs. We’ve extended the gem with information on “erroring” jobs (jobs that are in the process of retrying but have not yet permanently failed), as well as the ability to filter by additional fields such as job name, priority, and the owning team (which we store in an additional column). We also maintain two other dashboards in our cloud monitoring service, DataDog. These are powered by instrumentation and continuous monitoring features that we have added directly to the delayed gem itself. When jobs run, they emit ActiveSupport::Notification events that we subscribe to and then forward along to a StatsD emitter, typically as “distribution” or “increment” metrics. Additionally, we’ve included a continuous monitoring process that runs aggregate queries, tagged and grouped by queue and priority, and that emits similar notifications that become “gauge” metrics. Once all of these metrics make it to DataDog, we’re able to display a comprehensive timeboard that graphs things like average job runtime, throughput, time spent waiting in the queue, error rates, pickup query performance, and even some top 10 lists of slowest and most erroring jobs. On the alerting side, we have DataDog monitors in place for overall queue statistics, like max age SLA violations, so that we can alert and page ourselves when queues aren’t working off jobs quickly enough. Our SLAs are actually defined on a per-priority basis, and we’ve added a feature to the delayed gem called “named priorities” that allows us to define priority-specific configs. These represent integer ranges (entirely orthogonal to queues), and default to “interactive” (0-9), “user visible” (10-19), “eventual” (20-29), and “reporting” (30+), with default alerting thresholds focused on retry attempts and runtime. There are plenty of other features that we’ve built that haven’t made it into the delayed gem quite yet. These include the ability for apps to share a job queue but run separate workers (i.e. multi-tenancy), team-level job ownership annotations, resumable bulk orchestration and batch enqueuing of millions of jobs at once, forward-scheduled job throttling, and also the ability to encrypt the inputs to jobs so that they aren’t visible in plaintext in the database. Any of these might be the topic for a future post, and might someday make their way upstream into a public release! But Does It Scale? As we've grown, we've had to push at the limits of what a database-backed queue can accomplish. We’ve baked several improvements into the delayed gem, including a highly optimized, SKIP LOCKED-based pickup query, multithreaded workers, and a novel “max percent of max age” metric that we use to automatically scale our worker pool up to ~3x its baseline size when queues need additional concurrency. Eventually, we could explore ways of feeding jobs through to higher performance queues downstream, far away from the database-backed workers. We already do something like this for some jobs with our journaled gem, which uses AWS Kinesis to funnel event payloads out to our data warehouse (while at the same time benefiting from the same at-least-once delivery guarantees as our other jobs!). Perhaps we’d want to generalize the approach even further. But the reality of even a fully "scaled up" queue solution is that, if it is doing anything particularly interesting, it is likely to be database-bound. A Redis-based queue will still introduce DB pressure if its jobs execute anything involving ActiveRecord models, and solutions must exist to throttle or rate limit these jobs. So even if your queue lives in an entirely separate datastore, it can be effectively coupled to your DB's IOPS and CPU limitations. So does the delayed approach scale? To answer that question, I’ll leave you with one last takeaway. A nice property that we’ve observed at Betterment, and that might apply to you as well, is that the number of jobs tends to scale proportionally with the number of customers and accounts. This means that when we naturally hit vertical scaling limits, we could, for example, shard or partition our job table alongside our users table. Then, instead of operating one giant queue, we’ll have broken things down to a number of smaller queues, each with their own worker pools, emitting metrics that can be aggregated with almost the same observability story we have today. But we’re getting into pretty uncharted territory here, and, as always, your mileage may vary! Try it out! If you’ve read this far, we’d encourage you to take the leap and test out the delayed gem for yourself! Again, it combines both DelayedJob and its ActiveRecord backend, and should be more or less compatible with Rails apps that already use ActiveJob or DelayedJob. Of course, it may require a bit of tuning on your part, and we’d love to hear how it goes! We’ve also built an equivalent library in Java, which may also see a public release at some point. (To any Java devs reading this: let us know if that interests you!) Already tried it out? Any features you’d like to see added? Let us know what you think!