环境信息:
- python 3.6.2
- Django 1.11.3
- celery 4.1.0
- django-celery-beat 1.1.1
准备工作:
- 安装redis;
- 启动redis;
- 安装celery;
- Django集成Celery;
启动celery;
以上步骤参考:「Celery」集成Django的基本配置及使用,同时接下来
django-celery-beat
的配置是在上述链接的基础上进行配置。
django-celery-beat配置:
Django中一个‘incubator’项目的目录如下:
- incubator/
- manage.py
- incubator/
- __init__.py
- settings.py
- urls.py
- mails/ # 这是一个app
- models.py
- views.py
- tasks.py
安装:
1
pip install -U django-celery-beat
如果项目中
settings.py
中设置了USE_TZ = False
,那么建议使用源码安装,有一个bug需要修改源文件的方式进行解决。随后在项目根目录,进行migrate
:1
python manage.py migrate
配置
incubator/incubator/settings.py
:1
2
3
4
5
6
7
8
9
10
11...
INSTALLED_APPS = (
...,
'django_celery_beat',
)
...
# 指定任务调度器
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers.DatabaseScheduler'配置
incubator/incubator/celery.py
:如果在
settings.py
中设置了USE_TZ = False
,在使用django-celery-beat
的过程中可能会报错,所以需要在原本配好的基础上加一行app.now = datime
:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15from __future__ import absolute_import, unicode_literals
import os
import datetime
import django
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'incubator.settings')
django.setup() # 如果 tasks 里边涉及对 model 的操作,则需要加上 django.setup()
app = Celery('incubator')
app.now = datetime.datetime.now # 解决时区的报错
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks() # 自动加载各个app下边的tasks.py解决时区报错:
按上述加了
app.now = datetime.datetime.now
之后会引发另一个报错,需要通过修改django-celery-beat
源码解决,其须修改的文件路径如下:django_celery_beat/schedulers.py
,其中ModelEntry
类中的_default_now
方法,如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16class ModelEntry(ScheduleEntry):
"""Scheduler entry taken from database row."""
...
def _default_now(self):
now = self.app.now()
# The PyTZ datetime must be localised for the Django-Celery-Beat
# scheduler to work. Keep in mind that timezone arithmatic
# with a localized timezone may be inaccurate.
# 下边这行是原来的,注释掉
# return now.tzinfo.localize(now.replace(tzinfo=None))
return now # 改成直接返回 now
...配置
incubator/mails/tasks.py
:随便写个样例
1
2
3
4
5
6
7from celery import shared_task
# @shared_task(name='new_task_name') # 重命名可以方便任务调用
def do_something(*arg, **kwargs):
pass # 后台可以灵活配置arg以及kwargs的参数,在运行的时候通过数据库传参将
django-celery-beat
新增的model,配置到后台:看
django_celery_beat.model
可以看到一共是有5个model,具体的功能可以看源码,也很容易懂,这里主要用的是IntervalSchedule
、CrontabSchedule
和PeriodicTask
。因为这里用的是xadmin
后台的框架,但后台框架不会影响定时任务的功能,就不赘述。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19import xadmin
from django_celery_beat.models import IntervalSchedule, CrontabSchedule, PeriodicTask
class PeriodicTaskAdmin(object):
list_display = ['name', 'task', 'interval', 'crontab']
class IntervalScheduleAdmin(object):
pass
class CrontabScheduleAdmin(object):
pass
xadmin.site.register(PeriodicTask, PeriodicTaskAdmin)
xadmin.site.register(IntervalSchedule, IntervalScheduleAdmin)
xadmin.site.register(CrontabSchedule, CrontabScheduleAdmin)后台配置:
首先按需配置好
interval
、crontab
:随后再新建
periodic task
:需要注意
Task name
字段,为任务的路径或者名字,如果按照上述步骤配置的话,Task name
应为mails.tasks.do_something
。如果使用了@shared_task(name='new_task_name')
进行重命名,那么Task name
应为new_task_name
另外,如果是带参的定时任务,可以通过
Arguments
、Keyword arguments
进行传参。
启动Celery
在 incubator/
下运行:
1 | celery -A incubator beat -l info |
- 注意:一定要在worker运行后,beat运行的任务才会执行
参考: