「Celery」django-celery-beat配置定时任务

环境信息:

  • python 3.6.2
  • Django 1.11.3
  • celery 4.1.0
  • django-celery-beat 1.1.1

准备工作:

  1. 安装redis;
  2. 启动redis;
  3. 安装celery;
  4. Django集成Celery;
  5. 启动celery;

    以上步骤参考:「Celery」集成Django的基本配置及使用,同时接下来django-celery-beat的配置是在上述链接的基础上进行配置。

django-celery-beat配置:

Django中一个‘incubator’项目的目录如下:

- incubator/
  - manage.py
  - incubator/
    - __init__.py
    - settings.py
    - urls.py
  - mails/  # 这是一个app
    - models.py
    - views.py
    - tasks.py
  1. 安装:

    1
    pip install -U django-celery-beat

    如果项目中settings.py中设置了USE_TZ = False,那么建议使用源码安装,有一个bug需要修改源文件的方式进行解决。随后在项目根目录,进行migrate

    1
    python manage.py migrate
  2. 配置 incubator/incubator/settings.py:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    ...

    INSTALLED_APPS = (
    ...,
    'django_celery_beat',
    )

    ...

    # 指定任务调度器
    CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers.DatabaseScheduler'
  3. 配置 incubator/incubator/celery.py

    如果在settings.py中设置了USE_TZ = False,在使用django-celery-beat的过程中可能会报错,所以需要在原本配好的基础上加一行app.now = datime

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    from __future__ import absolute_import, unicode_literals
    import os
    import datetime

    import django
    from celery import Celery


    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'incubator.settings')
    django.setup() # 如果 tasks 里边涉及对 model 的操作,则需要加上 django.setup()

    app = Celery('incubator')
    app.now = datetime.datetime.now # 解决时区的报错
    app.config_from_object('django.conf:settings', namespace='CELERY')
    app.autodiscover_tasks()  # 自动加载各个app下边的tasks.py
  4. 解决时区报错:

    按上述加了app.now = datetime.datetime.now之后会引发另一个报错,需要通过修改django-celery-beat源码解决,其须修改的文件路径如下:django_celery_beat/schedulers.py,其中ModelEntry类中的_default_now方法,如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    class ModelEntry(ScheduleEntry):
    """Scheduler entry taken from database row."""

    ...

    def _default_now(self):
    now = self.app.now()
    # The PyTZ datetime must be localised for the Django-Celery-Beat
    # scheduler to work. Keep in mind that timezone arithmatic
    # with a localized timezone may be inaccurate.

    # 下边这行是原来的,注释掉
    # return now.tzinfo.localize(now.replace(tzinfo=None))
    return now # 改成直接返回 now

    ...
  5. 配置 incubator/mails/tasks.py:

    随便写个样例

    1
    2
    3
    4
    5
    6
    7
    from celery import shared_task


    # @shared_task(name='new_task_name') # 重命名可以方便任务调用
    @shared_task
    def do_something(*arg, **kwargs):
    pass # 后台可以灵活配置arg以及kwargs的参数,在运行的时候通过数据库传参
  6. django-celery-beat新增的model,配置到后台:

    django_celery_beat.model可以看到一共是有5个model,具体的功能可以看源码,也很容易懂,这里主要用的是IntervalScheduleCrontabSchedulePeriodicTask。因为这里用的是xadmin后台的框架,但后台框架不会影响定时任务的功能,就不赘述。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    import xadmin
    from django_celery_beat.models import IntervalSchedule, CrontabSchedule, PeriodicTask


    class PeriodicTaskAdmin(object):
    list_display = ['name', 'task', 'interval', 'crontab']


    class IntervalScheduleAdmin(object):
    pass


    class CrontabScheduleAdmin(object):
    pass


    xadmin.site.register(PeriodicTask, PeriodicTaskAdmin)
    xadmin.site.register(IntervalSchedule, IntervalScheduleAdmin)
    xadmin.site.register(CrontabSchedule, CrontabScheduleAdmin)
  7. 后台配置:

    首先按需配置好intervalcrontab

    interval-list

    crontab-list

    随后再新建periodic task

    crontab-list

    需要注意Task name字段,为任务的路径或者名字,如果按照上述步骤配置的话,Task name应为mails.tasks.do_something。如果使用了@shared_task(name='new_task_name')进行重命名,那么Task name应为new_task_name

    另外,如果是带参的定时任务,可以通过ArgumentsKeyword arguments进行传参。

启动Celery

incubator/ 下运行:

1
celery -A incubator beat -l info
  • 注意:一定要在worker运行后,beat运行的任务才会执行

参考: