Back

Multi-tenant queues in Postgres

186 points22 hoursdocs.hatchet.run
ndriscoll15 hours ago

You've got something wonky going on with that query plan for the 2nd partition by attempt. In particular the seq scan on tasks to do the `(tasks.id = eligible_tasks.id)` hash join seems odd. The filter on queued status in `CTE eligible_tasks` (and not in the last join) also seems weird. Is that plan for the same query in the article?

If you add an index on `group_key, id WHERE status = 'queued'` and remove the 2nd `WHERE tasks."status" = 'QUEUED'` (I believe that's redundant?), you might get a better plan. You'd want something to make sure you're not preferring one group_key as well.

I think you should be able to solve your problem with workers having zero tasks by moving the LIMIT into the second CTE?

It's also useful in practice to have something like a worker_id and timestamp and not just set status to RUNNING in case a worker gets stuck/dies and you need to unclaim the work.

abelanger14 hours ago

Ah, great catch - I just pushed an update to match the query from the article. I wasn't looking at much except for the WindowAgg line, thank you!

I tried with a similar indexing strategy - it did make a very noticeable difference, breaking at about 40000 enqueued tasks instead of 25000. I left indexing out of the article because it can open up a different can of worms with performance degradation over a longer time horizon.

I also tried with an `ORDER BY RANDOM()` across group keys first, which does help with fairness but breaks the "deterministic" element.

ndriscoll13 hours ago

With your updated plan, you have

> Hash Cond: (tasks.id = t1.id)

> -> Seq Scan on tasks (cost=0.00..254.60 rows=48 width=14) (actual time=0.566..10.550 rows=10000 loops=1)

> Filter: (status = 'QUEUED'::"TaskStatus")

So it's still doing a seq scan on tasks when I'd expect it to join using the PK. It must be getting tripped up by the redundant filter on queued status. Try removing that.

I ninja edited my previous comment, but if you move the LIMIT to the 2nd CTE, that should fix your issue with workers not getting work. If you do that and add the other index I think in principle it should be able to do everything by maintaining a priority queue of the heads of each partition (which are each pre-sorted by the index now). idk if pg does that though. If it does, then that portion of the query should be streamed, so you don't need to try to limit it early to avoid a sort of 10k elements when you only need 10. Then if you remove the redundant QUEUED check, it should be doing everything else through indexes.

Basically, if doing this manually I'd expect the "good" solution to do this in a way where starting from an index, each row is streamed (i.e. no full sort) with logn complexity. So I look at it from a perspective of "how do I get the database to do what I'd do by hand?"

abelanger13 hours ago

I created a gist with these recommendations - certainly an improvement, but I don't think it gets around the `WindowAgg` running across all 10k rows: https://gist.github.com/abelanger5/5c1a75755072239716cb587a2.... Does this accurately implement your suggestions?

Happy to try out other suggestions, but I haven't found a way to get a window function or `JOIN LATERAL` to scale in near-constant time for this queueing strategy.

ndriscoll12 hours ago

It looks like now it does still only pull 100 rows out of the sort (so moving the limit into the 2nd cte didn't hurt). It isn't doing all 10000 rows now though, which is interesting. By any chance, do you have 9200 different tenants? If so that makes sense. What I suggested would work when you have a small number of tenants with queued work (it scales n log n with tenants with queued work, but log n with amount of tasks that a single tenant has queued). So if you're currently testing with many tenants queueing at once, you could see how it behaves with like 20 tenants where one has 9k items and the others have ~50 each. Then it sort of depends on how your distribution looks in practice to know whether that's acceptable.

You could also probably do tricks where individual workers filter to specific tenant IDs in the first CTE (e.g. filter group_key mod num_workers = worker_id) to reduce that cardinality if you expect it to be large. Or you could e.g. select 100 random group_keys as a first step and use that to filter the window, but then that needs another partial index on just `group_key where status = queued`.

Edit: I see it's still doing a seq scan on tasks. There is a PK on id, right? It knows there's only 100 rows from the rest of the query so it seems odd to me that it would decide to scan the table. You could try putting a hash index on id if it's refusing to use the btree index I guess. Or it might change its mind if you add 1M SUCCEEDED tasks or something.

Another thing to consider is that out of the box, pg's default config for the planner is tuned to like 20 year old hardware. You need to tweak the io costs for SSDs and tell it you have more RAM if you haven't done that. See e.g. https://pgtune.leopard.in.ua/ for better starting values.

plandis15 hours ago

At a previous job we did something similar but ended up having workers first poll another table to determine which tenant to query against. We called these items tokens and they represented a finite amount of dedicated thread time for processing a specific tenants’ queue.

What this looked like was a worker thread would first query the token table for which tenant to process eligible tasks from, and then update the token to take a timed lock and during that time would solely process eligible tasks from a specific tenant.

This has some nice properties:

1. You can scale different tenants using different amounts of tokens which means different but controlled amounts of thread time.

2. It allows for locality on your worker thread. Within a specific tenant the processing was usually similar so any shared resources could be cached and reused after polling for additional eligible tasks from the tenants queue.

magicalhippo5 hours ago

Reminded me of the token bucket[1] algorithm. Good point about locality.

[1]: https://en.wikipedia.org/wiki/Token_bucket

klysm4 hours ago

I like this approach a lot, but I’m unsure about time based vs number of items based fairness. I guess it really depends on the application.

jperras16 hours ago

Is the `FOR UPDATE SKIP LOCKED` in the CTE necessary? Granted my understanding of Postgres row-level locking and their interaction with CTEs may be a bit lacking, but according to the docs[1]:

  The sub-statements in WITH are executed concurrently with each other and with the main query. Therefore, when using data-modifying statements in WITH, the order in which the specified updates actually happen is unpredictable. All the statements are executed with the same snapshot (see Chapter 13), so they cannot “see” one another's effects on the target tables.

1. https://www.postgresql.org/docs/current/queries-with.html#QU...
mslot16 hours ago

Read committed mode (PostgreSQL's default) can get pretty funky.

If two transactions concurrently perform a SELECT (may be in a CTE) followed by an UPDATE, then they might see and try to update the same rows. That's often undesirable, for instance in the example of a queue where messages are supposed to arrive ~once. Serializable mode would "solve" the problem by letting one transaction fail, and expects the application to retry or otherwise deal with the consequences.

FOR UPDATE is a precision tool for working around read committed limitations. It ensures rows are locked by whichever transaction reads them first, such that the second reader blocks and (here's the funky part) when the first transaction is done it actually reads the latest row version instead of the one that was in the snapshot. That's semantically a bit weird, but nonetheless very useful, and actually matches how updates work in PostgreSQL.

The biggest issue with SELECT..FOR UPDATE is that it blocks waiting for concurrent updaters to finish, even if the rows no longer match its filter after the update. The SKIP LOCKED avoids all that by simply skipping the locked rows in the SELECT. Semantically even weirder, but very useful for queues.

jperras15 hours ago

Ah, I see - the default transactional isolation level is what I wasn't accounting for.

Thanks for the explanation! Very much appreciated.

nosefrog9 hours ago

Is a fair queue worth it vs spinning up more capacity? I've worked on multiple projects where we've ended up ripping out a queue and just spinning up more machines to handle the load synchronously instead.

vidarh6 hours ago

More capacity won't address operations that the originator isn't willing to (or can't) hang around to wait for and/or that are long-running enough that restarts due to failures might be needed. That's the most immediate reason: Tasks where no amount of capacity will remove the need to have some form of queueing mechanism.

For complex enough workflows, queues are also often helpful at addressing potentially failing stages in ways that are easier to debug. But in that case you want your queue to be closer to a state machine where actually waiting in the queue for much time is the exception. You can just build a state machine for that too, ensuring the inputs to the stage about to execute are recorded in a durable, restartable way. But sometimes you may need more copies of the same type of job, and soon you have something that looks and smells much like a queue anyway.

Then lastly, spikes. But they only really help well enough if you still spin up more machines aggressively enough that the wait time doesn't get long enough to be perceived as just as bad as or worse than an immediate error, so it does make sense to ask your question.

A queue also doesn't need to be complex. If it gets complex, that increases the reasons to ask your question for that specific system. If it potentially grows large, as well (sometimes the solution to that is simply to refuse to queue if the queue exceeds a certain size or the processing time goes above a certain threshold).

Queues are great when appropriate, but they do often get used as a "solution" to a scaling problem that hasn't been sufficiently analyzed, which sounds like might have been the case in your examples.

strken4 hours ago

Is it a choice? Most projects I've worked on had times when they became overwhelmed with requests; a queue handles this case, but more capacity just makes it rarer. Ideally you want enough capacity to handle X% of requests within Y milliseconds and a queue to deal with the leftovers, and I suppose if your X is low enough then a fair queue becomes a necessity.

datascienced7 hours ago

I guess a queue handles spikes, and is OK for async-allowed operations such as generating a PDF and email it over say loading a web page.

A queue may give you time to scale up too?

time0ut13 hours ago

Very cool. Bookmarked in case I ever need to do this.

I have implemented a transactional outbox in postgres using a simpler version of this plus a trigger to notify listening workers. It worked well and vastly outpaced the inserting processes. It easily handled several million tasks per hour without issue.

It is also nice the article showed the correct CTE based form of the query. It is possible to naively write the query without it and sometimes get way more tasks than you asked for when there are concurrent workers. I discovered that pretty quickly but it had me pulling my hair out…

aranw4 hours ago

Thanks for this write up. Really interesting I've built queues using Postgres before but never anything this complex so I'm sure this article will come in use and be handy in future!

jvolkman13 hours ago

I have my queue workers maintain a list of task owners they've already processed, and prefer to get a task from an owner they've least-recently seen (or haven't seen) using `ORDER BY array_position(:seen_owner_ids, owner_id) desc`. Each new task's owner_id is inserted into the front of the list (and removed elsewhere if it exists).

But I have a relatively small number of possible `owner_id` values at any given time.

phibz12 hours ago

Why not something like Kafka or Redis?

teraflop8 hours ago

The most straightforward reason is that if you need a transactional database anyway, then moving the queue into the DB allows you to atomically en/dequeue messages at the same time as making other updates. Which can massively simplify your architecture because it eliminates an enormous category of possible failure modes. (Or it can massively improve your system correctness, if you didn't realize those failure modes were possible.)

dewey10 hours ago

Also most of the times you already have a database so why not use that instead of adding another service to the pile.

klysm4 hours ago

Using Kafka as a work queue is widely documented as a mistake. Using a single database results in a lot of operational simplicity and you get to skip a lot of distributed systems when all your state is in a single system

hipadev2311 hours ago

Because that’s sane, easy, and boring.

mind-blight9 hours ago

Super cool! I was looking at the self hosted quickstart, and it looks like Docker compare installs both hatchet and RabitMQ. Does hatchet use rabbit alongside Postgres?

ucarion17 hours ago

This is pretty fancy stuff! Sorry if I'm just not reading carefully enough, but does this approach account for tenants whose messages take longer to process, as opposed to a tenant that sends a larger volume of messages?

abelanger16 hours ago

No, this assumes you've already split your tasks into quanta of approximately the same duration - so all tasks are weighted equally in terms of execution time. If each of the tasks have different weights you might be looking at something like deficit round-robin [1], which I've looked at implementations for in RabbitMQ and we're thinking about how to implement in PG as well [2].

[1] https://www.cs.bu.edu/fac/matta/Teaching/cs655-papers/DRR.pd...

[2] https://nithril.github.io/amqp/2015/07/05/fair-consuming-wit...

ucarion12 hours ago

Makes sense!

My gut tells me that it would often make sense to jump straight to shuffle sharding, where you'd converge on fair solutions dynamically, in a lot of cases. I'm looking forward to that follow-on article!

wbeckler10 hours ago

I love your animations! How did you do those?

abelanger10 hours ago

Thank you! I've been using https://jitter.video with the Lottie exporter. It also has a Figma plugin so you can reuse components.

andrewstuart17 hours ago

I wonder if Postgres RBAC row based access control is another solution to this.

hackermeows11 hours ago

[dead]

throwaway9843937 hours ago

[dead]