Detect if it's retry


Is it possible to detect inside the actor if it was called from the Retries middleware?

The only way I could think was to change the Retries.after_process_message to add a flag to the message.kwargs to signal it’s a retry.

Is there any better way?



There’s no builtin way to do it right now, but you could potentially achieve this with a custom middleware like this (untested!):

from threading import local

from dramatiq import Middleware

class CurrentMessageManager:
    STATE = local()

    def get_current_message(self):
            return self.STATE.message
        except  AttributeError:
            raise RuntimeError("No current message.")

    def set_current_message(self, message):
        self.STATE.message = message

    def clear_current_message(self):
        del self.STATE.message

class CurrentMessageMiddleware(Middleware):
    def __init__(self, manager):
        self.manager = manager

    def before_process_message(self, broker, message):

    def after_process_message(self, broker, message, *, result=None, exception=None):

Then you’d use it when setting up your broker:

import dramatiq
from dramatiq.brokers.rabbitmq import RabbitmqBroker

current_message_manager = CurrentMessageManager()
broker = RabbitmqBroker()

And inside your actors:

def do_stuff():
    message = current_message_manager.get_current_message()
    print(message.options.get("retries", 0))

Hope that helps!


This looks awesome, I like how one can access the message object, this can be very useful.