Is it possible to schedule recurring tasks?


Hi all!

Was curious if it’s possible to schedule certain jobs to run on a set schedule. Or perhaps a recommended recipe for achieving this functionality, similar to Celery in some fashion.




While typing my question I thought to look in the Cookbook and found what I was looking for. In case anyone, has the same question I did! :+1:



I appended this code to my normal where I have all my other tasks that are usually called via a web-service.
Do I need to run the scheduled task in an extra script on the command line?

Also I am using reddis a broker I am retrieving the following error:

import pika
ImportError: No module named 'pika'



@MaxRichter you need to initialize dramatiq with your redis broker before you can send jobs to redis (via the scheduler or otherwise). So just do dramatiq.set_broker(RedisBroker())

I like the idea of using APScheduler with dramatiq, however it seems that one can’t use any of the persistence features with it. I’m getting a “TypeError: can’t pickle LockType objects” with the Redis Broker - I’d assume its the same with RabbitMQ, haven’t tested it though.
One workaround for this is to implement a proxy method that does the actual send

def some_actor_proxy():
def some_actor():

scheduler = BlockingScheduler()
scheduler.add_job(actor_proxy, "interval", seconds=5) # <this works

But that is clearly ugly. Couldn’t come up with any way to make the decorated actor pickleable in a more automagical fashion so far :-/
Maybe @bogdan has a suggestion here? Loving dramatiq by the way!


The issue is likely because Actors keep a reference to their brokers and, in turn, the brokers keep references to their client instances which are unlikely to be pickle-able.

I’m afraid there might not be an easy solution here. You might be able to (ab)use __{get,set}state__ to come up with some sort of actor-aware proxy object/decorator like this (untested):

class make_actor_pickleable:
    def __init__(self, actor):
        self.__actor = actor

    def __getattr__(self, name):
        return getattr(self.__actor, name)

    def __getstate__(self):
        return {"actor_name": self.__actor.actor_name}

    def __setstate__(self, state):
        self.__actor = dramatiq.get_broker().get_actor(state["actor_name"])

You might then be able to use that like this:

def some_actor():

or (and this is probably better than using it as a decorator) directly inside your add_job calls:

scheduler.add_job(make_actor_pickleable(actor), "interval", seconds=5) 

Buuut… I’m not sure if that’s an improvement! :stuck_out_tongue:


Well, its a good starting point at any rate, thanks :smiley:
I don’t have any immediate need for this but I’m sure I’ll have to tackle this at some point, I’ll report back then =)