Use case for dramatiq


#1

Hey Bogdan, first of all thanks a lot for creating this, I really hope it takes off, because the quality looks really nice compared to anything else from the Py ecosystem.

I browsed your website and the documentation for quite a while these past two days, and I must say I like what I see. The documentation is also of very good quality, with concrete examples. If there’s anything to improve, I could only think of the “client” side (sending / creating messages), for instance: best practices, “client” only actors and the likes.

When it comes to making it easier for new comers to decide on using it, it would be really cool to have some sort of use cases / success stories, maybe your own?

To help with this, I’d like to present what my first intended use case is, and maybe you could provide some feedback and thoughts as to how best deal with it. This projects consists mainly of two sort of tasks: the ones that do a rough analysis of existing data and sniff out some potential leads, and the others look very deep into those potential leads. All of the processing is distributed, running right now on a cluster of custom built AMD Ryzen boxes (each box with one CPU, 8x cores, 16x SMT units).

The tasks sniffing out leads do the exact same work on various data partitions. The throughput is roughly 10k jobs per 24h, every task takes several minutes to process. These tasks are created all at once for a given set of data. In case a lead is found, the data about that lead is persisted in a DB and another (auxiliary) task is created for the purpose of creating the tasks for analyzing that lead. Depending on the lead, 10’s to 1000’s of tasks are submitted. Each of them takes up to 10 minutes to process (average ~3 minutes). The jobs are not limited by IO, RAM or other factors, it’s all about CPU processing power.

Goals:

  • all tasks must be processed, this is paramount
  • use all available systems at their fullest, maximum processing power efficiency. The customer plans to add AWS nodes into the mix
  • be able to run all types of tasks (sniffing tasks, lead analyzes tasks and auxiliary tasks) on the same node, with sniffing jobs having the highest priority. Three priorities would do it right now

How would you deal with this using dramatiq? How many worker processes would you start on every box? Would you use threads at all, or just processes given that the tasks take minutes? I thought about 16 processes per box (1 for every SMT).

Obviously, we will have to experiment with these things on our own, but I’m really looking forward to your thought and suggestions.

Edit: using Ubuntu 18.04 LTS, Python 3.6, RabbitMQ

Thank you!


#2

Thanks!

In addition to giving them separate priorities, I’d assign the three actors separate queues.

16 processes and a low number of threads or no threads at all is also what I would start with, though it would be helpful to understand what the processing code actually looks like. Is it all number crunching or traversing large data structures or what?

If the code really is completely CPU bound and tasks can take that long to process, then you should make sure to set your RMQ heartbeat interval (1, 2) to a high enough value to avoid the same message being redelievered multiple times (when a worker picks up a message from rmq, it’s moved into the “unacked” state where it stays either until the worker acknowledges that message or until the worker disconnects without doing so – the way it figures out if a worker has disconnected is through heartbeats). Alternatively, you need to make sure the interpreter is able to switch threads every now and then (every “system call” is a chance for it to do so, because every one of those calls releases the GIL for the current thread).

Hope that helps!


#3

Interesting, its advantages when dealing with issues plus reporting of what’s going on benefit from this quite a bit. Are there also performance / scheduling reasons behind doing this?
Will priorities work across multiple queues?
Dramatiq supports prioritizing actors, but this is not very clear:

priority: The actor’s global priority. If two tasks have been pulled on a worker concurrently and one has a higher priority than the other then it will be processed first. Lower numbers represent higher priorities.

Specifically, what I’d like to happen is: as long as there are high priority messages enqueued, only these should be [ideally] processed. The documentation only covers what happens when two tasks are pulled, that belong to what I assume are two different actors of two different priorities. What will happen when the higher priority task (i.e. belonging to the higher priority actor) has finished processing? I assume it will process the low priority one, even when there are other higher priority tasks enqueued.

Right now I’m using the dramatiq tool for running the workers, it doesn’t appear to support --threads 0. Would you recommend to create my own runner?

It depends. Some part of the crunching is on very little data, that fits in the CPU cache, but most of it involves larger data that will be pulled from the RAM. Aside from the time it takes to pull the data from the network (few ms) when starting the task and then to pull it from the RAM, there is only CPU crunching.

Thanks for the tips, all of them very helpful.


#4

Yes.

The way it works is Dramatiq spawns one consumer thread per queue. For each queue, its consumer will pull some number of messages into memory and then push them on a shared, in-memory, priority queue that the workers then dequeue in order get work. That means that as long as there are high priority messages coming in off a particular queue, only those messages will be processed. As soon as that flow stops or there is a temporary break in it, other, lower priority, messages will be processed.

The docs could be much better here. :slight_smile:

Sorry, I was super unclear there. You should run Dramatiq with --processes=16 --threads=1 for 1 worker thread per process.


#5

I kept radio silence until now, but that doesn’t mean I don’t put dramatiq through its pace.
Only had one single problem, when a dramatiq spawn worker process fell victim to the Linux OOM Reaper because of the work it was doing, not because of dramatiq itself. The dramatiq worker process was terminated (zombie) by the Reaper, but it was not released. I couldn’t kill it manually either. In order to reclaim it as a worker process I had to restart dramatiq.

It would be nice if dramatiq would handle such events better, especially if we thing about running it on 100’s or 1000’s of boxes, self healing makes sense.

I will keep posting here my observations and experiences, but so far I am happy with what I see, it works like clockwork.

One suggestion for improving the documentation: the API reference for @dramatiq.actor is missing time_limit right now. EDIT: that’s actually a middleware option, covered by **options


#6

I think the appropriate behaviour here would be for the main process to stop its workers and shut down cleanly then exit with the appropriate code. That seems less operationally surprising to me than it attempting to recover by itself (though I know, for example, that gunicorn does something along those lines). Under that kind of a scenario it would be up to the user to, for example, specify Restart=on-failure in a systemd unit.

Happy to hear that!