Getting message status and task result


#1

Celery has a somewhat broken AsyncResult “proxy value” which contains the state of the task: pending, started, success, failure, etc. This is quite useful to determine whether a task is still waiting, or processing, or has finished.

Looking at Dramatiq’s Results section in the Cookbook, it seems that I can check whether a task has finished by attempting to fetch its result (provided a properly set up backend). How would I determine whether a task is still waiting in the queue or is currently processing?

Thanks!


#2

There is no functionality to check on the state of a message right now, though I do plan on adding it. Not sure exactly on the timeline, though. If you need something like that right now, you can write a middleware to DIY:

class MessageStatusMiddleware(Middleware):
  def after_enqueue(self, broker, message, delay):
    store_message_status_somewhere(message.id, "enqueued")

  def before_process_message(self, broker, message):
    store_message_status_somewhere(message.id, "processing")

  def after_process_message(self, broker, message, *, result=None, exception=None):
    store_message_status_somewhere(message.id, "done")

  after_skip_message = after_process_message

#3

I will need to think about the store_message_status_somewhere() as this looks like it can quickly become an ugly reach across code.

Might not need it right now but in the nearish future would be nice! :upside_down_face:Pleeeeeez?


#4

Hi, in some scenarios as a workaround to .ready() you can consider the following kind of code:

async_tasks = []
for n in [2,4,6,8,10]:
    async_tasks.append(mytask.send(n))
while async_tasks:
    for task in async_tasks:
        try:
            result = task.get_result(block=False)
            print(f'Task {task} finished, result is {result}')
            async_tasks.remove(task)
        except ResultMissing:
            print(f'{task} is still working')
        sleep(2)