Track status/result of a task


#1

This question was originally posted on GitHub under: https://github.com/Bogdanp/dramatiq/issues/93

Hi,

thank you for the hints and tried some of them, especially the results but I am still not happy with my result.

Let me describe you how what I want to achieve:

  1. I sent a Message via an Actor with message = add.send(1, 2) - from there I get back an message_id from the broker
  2. I want to store this message_id e.g. in a MySQL database before it is accessible with message.get_result(backend=backend) --> Here I do not know if any of the Middleware functions are of any help for this use case
  3. If 1 and 2 are in place, I can distinguish between an existing message/task and a non-existing one, by querying the database.
  4. Further I can update the status of the message/task in the database over the time of the process.

Answer from @bogdan:
To answer your question, though, this is possible via middleware. You can check out the AdminMiddleware in django_dramatiq for an example.

Further question:
I checked it out but this seems to be very tightly integrated into django, is it possible to adapt the behavior also to Flask?

Question for usage of this in django_dramatiq_example
How can I use the task class from django_dramatiq
in the django example provided, so using the variables that stores my function result into message_data:

id = models.UUIDField(primary_key=True, editable=False)
status = models.CharField(max_length=8, choices=STATUSES, default=STATUS_ENQUEUED)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
message_data = models.BinaryField()

to use with for example a MySQL DB, because these seem to be overwritten with
class Job(models.Model) in the provided example.

Best,
Max


#2

Sorry I wasn’t super clear the first time. You can’t just use that code unless you use Django (or at least the Django ORM). To use it with Flask, you’ll have to define your own model (assuming you use an ORM like SQLAlchemy) and then implement your own version of the AdminMiddleware, the links were meant to just serve as examples of how you might implement something like that yourself. :slight_smile:


#3

@bogdan

Okay, that is what I experienced yesterday - just wanted to make sure.

In case I want to use the django_dramatiq package, how can I store the metadata in a database like MySQL.
I am referring here to my second question

Question for usage of this in django_dramatiq_example

I know I have to activate that somehow, as it is set to default but can you give me a hint how I can activate that?
If I understand the task class correctly, in message_data the task result is stored or?


#4

In that case message_data is the encoded message that was sent to the broker. For your purposes, you won’t have to store that unless you want to save the arguments and options that were sent as part of the message in your DB. If all you want to do is store the status, then you won’t need that.

In dramatiq, when you call .send() on an actor, that ends up creating an instance of Message that instance is then converted to a dictionary and then to JSON and sent to your broker. The Task model in django_dramatiq stores that JSON data so that it can display it in the admin.

Hope that makes sense!


#5

That makes a lot of sense - so steps I did are:

  • Created admin user
  • I was able to see the Message Details and Status, Created and Updated

What I am still missing now:

  • Can I access this data (the raw JSON) that is in the admin console and if so how?
  • If I want to store the result that my Actor is returning, for this I would need a backend, e.g. Redis to call the get_result or is it possible to store the result in a Database?

#6

While using django_dramatiq? Yes, just send a message and then click on it in the admin.

You can store the result in a Database without using a result backend. Check out the after_process_message method on Middleware. The result parameter is the return value of the actor so your middleware could take that value and store it in a database. In fact, that’s exactly how result backends work: https://github.com/Bogdanp/dramatiq/blob/master/dramatiq/results/middleware.py#L68


#7

That makes total sense, I will implement this :slight_smile:

I have not expressed myself well here.
So what I want is to access the JSON and its fields:

id = models.UUIDField(primary_key=True, editable=False)
status = models.CharField(max_length=8, choices=STATUSES, default=STATUS_ENQUEUED)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
message_data = models.BinaryField()

without the admin tool - so programmatically - I guess it needs to be stored somewhere so it should be possible.


#8

@bogdan

I just wanted to follow up on my last question, if it is possible to access the JSON that is visible in the admin console, on any other way.


#9

I don’t think I fully understand the context around the question. I could use a concrete example to help me understand what you’re trying to achieve.

If you store the JSON in the database, then you can retrieve it, yes. In the case of django_dramatiq, you can use Message.decode to load it from the message_data by querying a specific message or using the .message property on Task instances. Concretely, you can do it like this with django_dramatiq:

>>> task = Task.tasks.get(pk="some-id")
>>> task.message
Message(...)

#10

I think I am not using the django_dramatiq in the way it should be used but yes, I wanted to store the JSON in DB rather than have to access the django admin control panel.

I am currently using your django_dramatiq_example and I tried to apply the .message on the Job which returns an error: AttributeError: 'Job' object has no attribute 'message' as this is not a Task instance.
Are you using a Task in your example repo?


I tried it out on your flask_dramatiq_example where you have defined def after_process_message(self, broker, message, *, result=None, exception=None): as middleware.

When I tried to retrieve the result with the lines you mentioned:

    actor = broker.get_actor(message.actor_name)
    store_results = actor.options.get("store_results", self.store_results)
    result_ttl = actor.options.get("result_ttl", self.result_ttl)

I am getting the following error:

   store_results = actor.options.get("store_results", self.store_results)
   AttributeError: 'AppContextMiddleware' object has no attribute 'store_results'

I guess this is not the correct way to retrieve the result or? I just want to store the output of the result variable in my DB.


#11

Jobs in that repo have nothing to do with Tasks. You’ll want to from django_dramatiq.models import Task and query those objects.

The result is the value of the result parameter to the after_process_message method. The lines you mention are configuration for the ResultMiddleware and the reason you get an AttributeError is probably because store_results isn’t an instance variable on your class.

Let’s say you have a MessageResults model, you could store the result like so:

from dramatiq import Middleware

class DBResultsMiddleware(Middleware):
  def after_process_message(self, broker, message, *, result=None, exception=None):
    if exception is None:
      message_result = MessageResult(id=message.message_id, result=result)
      message_result.save()

#12

@bogdan thank you for the answers, I will check them tommorow and come back to you.

At the moment I have one more question for the django_dramatiq_example.
When I call the process_job.send(job.pk) in views.py I retrieve the message_id.
I modified the def process(self): in models.py to return an return value.

How can I retrieve the return value? Do I need to have a result backend or can I just modify the the line in job.process() in tasks.py to result = job.process() and then for example store the result in a DB?