Detect if it's retry


#1

Is it possible to detect inside the actor if it was called from the Retries middleware?

The only way I could think was to change the Retries.after_process_message to add a flag to the message.kwargs to signal it’s a retry.

Is there any better way?

Thanks!


#2

There’s no builtin way to do it right now, but you could potentially achieve this with a custom middleware like this (untested!):

from threading import local

from dramatiq import Middleware


class CurrentMessageManager:
    STATE = local()

    def get_current_message(self):
        try:
            return self.STATE.message
        except  AttributeError:
            raise RuntimeError("No current message.")

    def set_current_message(self, message):
        self.STATE.message = message

    def clear_current_message(self):
        del self.STATE.message


class CurrentMessageMiddleware(Middleware):
    def __init__(self, manager):
        self.manager = manager

    def before_process_message(self, broker, message):
        self.manager.set_current_message(message)

    def after_process_message(self, broker, message, *, result=None, exception=None):
        self.manager.clear_current_message()

Then you’d use it when setting up your broker:

import dramatiq
from dramatiq.brokers.rabbitmq import RabbitmqBroker

current_message_manager = CurrentMessageManager()
broker = RabbitmqBroker()
broker.add_middleware(CurrentMessageMiddleware(current_message_manager))
dramatiq.set_broker(broker)

And inside your actors:

@actor
def do_stuff():
    message = current_message_manager.get_current_message()
    print(message.options.get("retries", 0))

Hope that helps!


#3

This looks awesome, I like how one can access the message object, this can be very useful.

Thanks!