Synchronize web view handling and message sending


The following scenario with the Pyramid web framework. Within a view handler a new db object is created and added to the view’s db transaction, i.e. the object will be persisted upon successful view execution. In that same view a Dramatiq actor is called, and that actor needs to access the created db object.

We have a race here between the db object being persisted before the actor reads from the db:

  1. The clean solution would be to send the message only when the view was successul (and after the view’s db transaction has committed).
  2. The hacky solution would be to use the send_with_options(…, delay=n) function, but it seems that delay causes a short synchronous wait (i.e. it would stall the view handler) rather than an asynchronous message delivery?
  3. Poll on the actor side until the object appears in the actor’s db session (worse than 2. me thinks).

What other options do exist? Is there a “queue messages and send later” approach already available?


This isn’t correct. The message is sent immediately and is only processed on the worker after the delay. I wouldn’t go with this approach since there’s all sorts of reasons your delay could be too small and it’s a nightmare for maintainability.

This is the right approach. Either manually commit the transaction in your view (not familiar w/ Pyramid, but I assume this is possible) and then send the message or use some sort of commit hook (I assume Pyramid has something similar to Django’s on_commit).


Hehehe :wink:You didn’t like my third choice?

Anyway, I’ve tried the following in a Pyramid view function using the transaction’s after-commit hook, and it works:

def some_view(request):                                                                                      
    # Do some view stuff, modify db objects, etc.

    # Local helper function that issues the message to the broker. This
    # function has access to the closure but beware that db objects are
    # committed and invalid!
    def _send_message(success, *args, **kwargs):
        if success:

    # Get the transaction for this request.
    t =

    # Add the after-commit hook, i.e. call _send_message() when the
    # request's transaction has committed.
    t.addAfterCommitHook(_send_message, args=(1, 2, 3), kws={})

    # And return from the view function.
    return {
        "some": "results",

There’s also an example over at websauna which calls their apply_async() function with a tm (transaction manager) parameter.

@bogdan, what do you think about adding a tm parameter to send_with_options() which, instead of delay, sends the task when the transaction has committed? For example:


The code could be as simple as the example above, and delay and tm should probably be mutually exclusive.


I don’t think it would make much sense to couple Dramatiq to that mechanism.

You could subclass Actor in your own app and add a send_on_commit method (which it seems the websauna folks are doing):

from dramatiq import Actor

class TransactionalActor(Actor):
  def send_on_commit(self, tm, args=None, kwargs=None):
    def send(success):
      if success:
        self.send_with_options(args=args, kwargs=kwargs)

Unfortunately, there’s no easy way to change the actor base class right now in the decorator (this is something I may add in the future), so you’ll also have to roll your own actor decorator for now if you go with this approach:

def actor(fn=None, *, actor_name=None, queue_name="default", priority=0, broker=None, **options):
    def decorator(fn):
        return TransactionalActor(
            fn, actor_name=actor_name, queue_name=queue_name,
            priority=priority, broker=broker, options=options,

    if fn is None:
        return decorator
    return decorator(fn)


Thanks! That’s a lot of code for a simple thing. I do like this solution, but integrating a custom Actor class is cumbersome for now. So perhaps I’ll wait until the feature becomes available.

For now I’ve settled with a short helper function:

  def send_message(actor, args, tm):
      # See:
      def _send(success, *args, **kwargs):
          if success:
              actor, actor_args = (args)
      current_tm = tm.get()
      current_tm.addAfterCommitHook(_send, args=(actor, args))

which is being called from the view functions:

  actor_args = (1, 2, 3)
  send_message(some_actor, args=actor_args,