redisbeat
is a Celery Beat Scheduler that stores periodic tasks and their status in a Redis Datastore.
Tasks can be added, removed or modified without restarting celery using redisbeat
.
And you can add scheduler task dynamically when you need to add scheduled task.
- Full-featured celery-beat scheduler.
- Dynamically add/remove/modify tasks.
redisbeat
can be easily installed using setuptools or pip.
# pip install redisbeat
or you can install from source by cloning this repository:
# git clone /~https://github.com/liuliqiang/redisbeat.git
# cd redisbeat
# python setup.py install
redisbeat
provides a Docker demo in example folder that you can use:
# cd redisbeat/example
# docker-compose up -d
After you have compose running, you can easily see it working with following commands:
-
Celery worker logs
# docker-compose logs worker
-
Celery beat logs
# docker-compose logs beat
-
dynamic add the task
sub
# docker exec -it beat python add_task.py
-
dynamic remove the task
sub
# docker exec -it beat python rem_task.py
If you want to try locally you can install the requirements from pip, and run it as a python project changing the url of redis from 'redis' to 'localhost' in tasks.py Celery instance and config:
#(...)
app = Celery('tasks', backend='redis://redis:6379',
broker='redis://redis:6379')
app.conf.update(
CELERY_REDIS_SCHEDULER_URL = 'redis://redis:6379',
#(...)
Commands to start worker and beat:
# celery worker -A tasks -l INFO
# celery beat -A tasks -S redisbeat.RedisScheduler -l INFO
Configuration for redisbeat
is similar to the original celery configuration for beat.
You can configure redisbeat
as:
# encoding: utf-8
from datetime import timedelta
from celery.schedules import crontab
from celery import Celery
app = Celery('tasks', backend='redis://localhost:6379',
broker='redis://localhost:6379')
app.conf.update(
CELERYBEAT_SCHEDULE={
'perminute': {
'task': 'tasks.add',
'schedule': timedelta(seconds=3),
'args': (1, 1)
}
}
)
@app.task
def add(x, y):
return x + y
@app.task
def sub(x, y):
return x - y
when you want to add a new task dynamically, you can try this code such like in __main__
:
#!/usr/bin/env python
# encoding: utf-8
from datetime import timedelta
from celery import Celery
from redisbeat.scheduler import RedisScheduler
app = Celery('tasks', backend='redis://localhost:6379',
broker='redis://localhost:6379')
app.conf.update(
CELERYBEAT_SCHEDULE={
'perminute': {
'task': 'tasks.add',
'schedule': timedelta(seconds=3),
'args': (1, 1)
}
}
)
@app.task
def add(x, y):
return x + y
@app.task
def sub(x, y):
return x - y
if __name__ == "__main__":
schduler = RedisScheduler(app=app)
schduler.add(**{
'name': 'sub-perminute',
'task': 'tasks.sub',
'schedule': timedelta(seconds=3),
'args': (1, 1)
})
It can be easily to add task for two step:
- Init a
RedisScheduler
object from Celery app - Add new tasks by
RedisScheduler
object
Or you can define settings in your celery configuration file similar to other configurations.
CELERY_BEAT_SCHEDULER = 'redisbeat.RedisScheduler'
CELERY_REDIS_SCHEDULER_URL = 'redis://localhost:6379/1'
CELERY_REDIS_SCHEDULER_KEY = 'celery:beat:order_tasks'
CELERYBEAT_SCHEDULE = {
'perminute': {
'task': 'tasks.add',
'schedule': timedelta(seconds=3),
'args': (1, 1)
}
}
For running redisbeat
in multi node deployment, it uses redis lock to prevent same task to be executed mutiple times.
CELERY_REDIS_MULTI_NODE_MODE = True
CELERY_REDIS_SCHEDULER_LOCK_TTL = 30
This is an experimental feature, to use redisbeat
in production env, set CELERY_REDIS_MULTI_NODE_MODE = False
, redisbeat
will not use this feature.