Django 与 Celery 的集成


# Django 与 Celery 的集成

本项目的环境为:

  • CentOS 7
  • Python 3.8
  • Django 4
  • Celery 5
  • Redis 7

虽然是 Django 4,但没有使用什么新的特性和语法,所以同样适用于 Django 3 和 2。对于 Django 1,只有少部分路由语法不一样。

# 背景

Django 框架请求/响应的过程是同步的,框架本身无法实现异步响应。

但是我们在项目过程中会经常会遇到一些耗时的任务, 比如:发送邮件、发送短信、大数据统计等等,这些操作耗时长,同步执行对用户体验非常不友好,那么在这种情况下就需要实现异步执行。

关于 Celery 介绍和使用可以查看之前写的一篇 Python 中使用 Celery 实现任务调度

# 工作流程

先来看一下 Celery 集成到 Django 后的整个工作流程图:

Django - Celery 工作流程图,图来源于网络

(Django - Celery 工作流程图,图来源于网络)

  • 客户端发送请求到 Django。
  • Django 产生任务(要执行的函数),并把任务转发给消息队列(Celery 的 Broker)
  • Celery 的 Worker 从 Broker 拿到任务并且执行。
  • Worker 执行任务后将结果保存到后端数据库(Redis 或 Django ORM 等,也可以不存,具体看怎么配置)。

在这里我们需要用到的几个核心依赖包:

  • celery:Celery 核心包肯定要安装
  • redis:这里使用 Redis 数据库作为消息中间件(Broker)
  • django-celery-beat:能够将定时任务写到数据库,并对数据库表变化检查,一旦有数据库表改变,调度器重新读取任务进行调度,实现动态添加定时任务
  • django-celery-results:将 Celery 处理结果进行 ORM 存储(存到 Django 数据库中)

# 初始化项目

# 初始化 Django 工程

用 PyCharm 创建一个 Django 项目 django_celery_demo,可以看到基于我本地的 Python 版本,默认给我使用的是 Django 4。

接下来使用命令行创建应用 celery_app

# 新建一个 celery 应用,名为 celery_app
python manage.py startapp celery_app
1
2

此时项目目录结构如下:

django_celery_demo/
├── celery_demo/
│   │── migrations/
│   │── __init__.py
│   │── admin.py
│   │── apps.py
│   │── models.py
│   │── tests.py
│   └── views.py
├── django_celery_demo/
│   │── __init__.py
│   │── asgi.py
│   │── settings.py
│   │── urls.py
│   └── wsgi.py
└── manage.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

在项目配置文件 settings.py 中将刚刚创建的应用 celery_app 添加到 INSTALLED_APPS 中:









 


INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    # my apps
    'celery_app'
]
1
2
3
4
5
6
7
8
9
10

# 配置 Celery 和插件

安装 Celery 所需的依赖包,也就是一开始提到的四个:

pip install celery
pip install redis
pip install django-celery-beat
pip install django-celery-results
1
2
3
4

django_celery_demo/ 目录中(注意是项目配置文件 settings.py ,不是项目根目录)创建 celery.py 模块:













 





django_celery_demo/
├── celery_demo/
│   │── migrations/
│   │── __init__.py
│   │── admin.py
│   │── apps.py
│   │── models.py
│   │── tests.py
│   └── views.py
├── django_celery_demo/
│   │── __init__.py
│   │── asgi.py
│   │── celery.py
│   │── settings.py
│   │── urls.py
│   └── wsgi.py
└── manage.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

celery.py 的代码内容如下:

import os
from celery import Celery
from django.conf import settings

# 为 celery 设置环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'django_celery_demo.settings')

# 创建应用
app = Celery("django_celery_demo")
# 配置应用
# namespace 表示所有与 celery 相关的配置键都有一个 CELERY_ 的前缀 
app.config_from_object('django.conf:settings', namespace='CELERY')
# 从已经安装的 app 中加载任务模块
app.autodiscover_tasks()
1
2
3
4
5
6
7
8
9
10
11
12
13
14

接着在同级目录下的 __init__.py(即 django_celery_demo/django_celery_demo/__init__.py)里导入刚才创建的应用,这样就能确保这个 Celery 应用在 Django 启动时被加载,以便 @shared_task 装饰器能使用它:

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ('celery_app',)
1
2
3
4
5

在项目配置文件 settings.py 中将插件作为应用添加到 INSTALLED_APPS 中:











 
 


INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    # my apps
    'celery_app',
    # 3rd apps
    'django_celery_results',  # 注意此处应用名为下划线
    'django_celery_beat',     # 注意此处应用名为下划线
]
1
2
3
4
5
6
7
8
9
10
11
12
13

同时在 setting.py 中添加 celery 相关配置,指定 Broker 和 Backend。这里我们使用 Redis 作为 Broker,Celery 执行结果也暂时存在 Redis 中:

""" Celery Config Begin """
# https://docs.celeryq.dev/en/stable/userguide/configuration.html#new-lowercase-settings
# broker 配置,使用 Redis 作为消息中间件
CELERY_BROKER_URL = 'redis://:mypassword@127.0.0.1:6379/1'
# backend 配置,这里使用 Redis
CELERY_RESULT_BACKEND = 'redis://:mypassword@127.0.0.1:6379/2'
# 后端存储任务结果过期时间,秒
CELERY_RESULT_EXPIRES = 60 * 60 * 24
# 时区配置
CELERY_TIMEZONE = 'Asia/Shanghai'
""" Celery Config End """
1
2
3
4
5
6
7
8
9
10
11

当然了,如果想把 Celery 执行结果存储在 ORM 中,那么就把 backend 配置为 django-db,跟该 Django 项目使用的 DATABASES 关联起来(Django 默认的是 sqlite3,可以改成其他的,这里就不展开了):





 
 




""" Celery Config Begin """
# https://docs.celeryq.dev/en/stable/userguide/configuration.html#new-lowercase-settings
# broker 配置,使用 Redis 作为消息中间件
CELERY_BROKER_URL = 'redis://:mypassword@127.0.0.1:6379/1'
# backend 配置,使用 Django 数据库存储任务执行结果
CELERY_RESULT_BACKEND = 'django-db'
# 时区配置
CELERY_TIMEZONE = 'Asia/Shanghai'
""" Celery Config End """
1
2
3
4
5
6
7
8
9

小贴士

关于 Celery 的配置项,网上很多文章都是比较老的,建议参考官方文档的 New lowercase settings (opens new window),会给出新老版本字段写法的对照。

另外值得一提的是(也是官方推荐的),虽然 Celery 官方文档中的配置项都是小写的,但由于前面我们在创建 celery 应用时,指定了 namespace='CELERY',所以在这个 Django 项目的 settings.py 里,我们要先把官网中的那些配置名变成大写,然后统一加上 CELERY_ 的前缀。

# 创建异步任务

# 编写 tasks.py 模块

在子应用下建立各自对应的任务文件 tasks.py(必须是 tasks.py 这个名字,不允许修改),而我们这里就是在 celery_app 应用中创建 tasks.py 模块:








 











django_celery_demo/
├── celery_demo/
│   │── migrations/
│   │── __init__.py
│   │── admin.py
│   │── apps.py
│   │── models.py
│   │── tasks.py
│   │── tests.py
│   └── views.py
├── django_celery_demo/
│   │── __init__.py
│   │── asgi.py
│   │── celery.py
│   │── settings.py
│   │── urls.py
│   └── wsgi.py
└── manage.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

我们在 tasks.py 文件内创建一个任务函数 my_task

from django_celery_demo.celery import app
import time


# 加上 app 对象的 task 装饰器
# 此函数为任务函数
@app.task
def my_task():
    print("任务开始执行....")
    time.sleep(5)
    print("任务执行结束....")
1
2
3
4
5
6
7
8
9
10
11

这里把普通函数封装成 celery 的任务函数,可以使用的装饰器有两个:@app.task@shared_task

它们的区别是: shared_task 的源码 多了一个线程锁,我的理解是多进程情况下,控制任务分发时的资源竞争问题。

当我们使用 @app.task 装饰器定义我们的异步任务时,那么这个任务依赖于根据项目名 django_celery_demo 生成的这个 celery 实例:

app = Celery("django_celery_demo")

@app.task(bind=True)
def debug_task(self):
    print("Request: {0!r}".format(self.request))
1
2
3
4
5

然而我们在进行 Django 开发时为了保证每个 app 的可重用性,我们经常会在每个 app 文件夹下编写各自的异步任务,这些任务并不依赖于具体的 Django 项目名。使用 @shared_task 装饰器能让我们避免对某个项目名对应 Celery 实例的依赖,使 app 的可移植性更强:

from celery import shared_task

@shared_task
def add(x, y):
    return x + y
1
2
3
4
5

换言之,由 shared_task 装饰的任务可以被任何 app 调用。

# 编写任务调用接口

在子应用下的视图文件中创建任务调用接口,而我们这里就是在 celery_app/views.py 中创建视图 index

from django.http import HttpResponse
from celery.result import AsyncResult
from .tasks import my_task


def index(request):
    # 将 my_task 任务加入到 celery 队列中
    # 如果 my_task 函数有参数,可通过 delay() 传递
    # 例如 my_task.delay(10, 20)
    res = my_task.delay()
    result = AsyncResult(res.task_id)

    return HttpResponse(
        "<h1>服务器返回响应内容!</h1><h2>status: {0}</h2><h2>task_id: {1}</h2>"
        .format(result.status, result.task_id)
    )
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# 编写 url 路由

django_celey_demo/settings.py 配置视图路由:



 



 


from django.contrib import admin
from django.urls import path
from celery_app.views import index

urlpatterns = [
    path('admin/', admin.site.urls),
    path('', index),
]
1
2
3
4
5
6
7
8

# 查看异步任务执行

# 首次运行初始化

首先进行 Django 工程运行前的初始化工作:

python manage.py makemigrations
python manage.py migrate
1
2

可以看到自动生成了一些表,其中以 django_celery_beat_* 开头的表名就是定时器信息相关的数据库表,由 django_celery_beat 这个 app 创建。

django_celery_results_* 开头的数据库表存储了任务执行结果的相关信息,由 django_celery_results 这个 app 创建,如果 backend 使用的是 Django 数据库,就会往里面写入数据,否则就是空表(因为我们 INSTALL 了这个 app,所以就创建出了表)。

接下来创建后台管理员帐号,后面可用于登录管理后台 Admin:

python manage.py createsuperuser
1

然后别忘了往 settings.py 文件的 ALLOWED_HOSTS = [] 里添加一下允许访问的 IP,不然启动了 Django 服务后你也没法访问页面。

# 运行 Django 工程

现在可以运行 Django 工程了。

python manage.py runserver
1

访问 ip:port/admin/ 可以进入管理后台,目前有如下配置项:

Django Celery 管理后台

(Django Celery 管理后台)

# 启动 worker

创建 worker 等待处理 celery 队列中任务,在终端执行命令(需要在项目根目录下执行):

# celery -A 你的工程名 worker -l info
celery -A django_celery_demo worker -l info
1
2

可以看到打印出了如下信息:

启动 worker

(启动 worker)

# 执行一次异步任务

按照我们前面写的路由,访问首页 ip:port/ 即可调用 index 视图下的异步任务 my_task

访问首页调用 index 视图下的异步任务

(访问首页调用 index 视图下的异步任务)

# 查看存储的任务结果

如果之前使用 django_celery_results 将任务执行结果存储在 Django 数据库的话,那么可以直接从管理后台 Admin 里查询任务执行结果。

不过这里我是将 backend 设置成了 Redis,所以就通过命令行简单看一下:

异步任务执行结果

(异步任务执行结果)

# 创建定时任务

# 编写一些任务

为了方便演示,往 django_celery_demo/celery_app/tasks.py 中再创建一些任务:














 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

from django_celery_demo.celery import app
import time


# 加上 app 对象的 task 装饰器
# 此函数为任务函数
@app.task
def my_task():
    print("任务开始执行....")
    time.sleep(5)
    print("任务执行结束....")


# 用于定时执行的任务
@app.task(bind=True)
def interval_task(self, *args, **kwargs):
    task_context = self.request.__dict__
    desc = args[0] if args else ''
    print('task name: {0}, desc: {1}'.format(
        task_context['properties']['periodic_task_name'], desc)
    )


# 用于定时执行的任务
@app.task(bind=True)
def crontab_task(self, *args, **kwargs):
    task_context = self.request.__dict__
    desc = args[0] if args else ''
    print('task name: {0}, desc: {1}'.format(
        task_context['properties']['periodic_task_name'], desc)
    )
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

# 定时任务配置

  • 要使用定时任务,需要安装额外包 django-celery-beat(一开始就已经做过了)
  • django_celery_beat 这个 app 加入到 settings.py 的 INSTALLED_APPS(一开始也已经做过了)
  • 接下来就是增加定时任务配置

回到 settings.py,在 Celery 的配置中增加这么一行:











 
 


""" Celery Config Begin """
# https://docs.celeryq.dev/en/stable/userguide/configuration.html#new-lowercase-settings
# broker 配置,使用 Redis 作为消息中间件
CELERY_BROKER_URL = 'redis://:mypassword@127.0.0.1:6379/1'
# backend 配置,这里使用 Redis
CELERY_RESULT_BACKEND = 'redis://:mypassword@127.0.0.1:6379/2'
# 后端存储任务结果过期时间,秒
CELERY_RESULT_EXPIRES = 60 * 60 * 24
# 时区配置
CELERY_TIMEZONE = 'Asia/Shanghai'
# 配置定时器模块,定时器信息存储在数据库中
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
""" Celery Config End """
1
2
3
4
5
6
7
8
9
10
11
12
13

# 增加定时任务计划

以前我们需要在 settings.py 中通过硬编码来定义定时规则和对应的任务,而现在引入了 django-celery-beat 这个插件后,就可以实现动态添加定时任务了。

启动 Django,登录管理后台 Admin:

Django Celery 管理后台

(Django Celery 管理后台)

其中「Crontabs」创建的执行规则是通过 crontab (opens new window) 语法来定义的(特定时间,比如某时某分);「Intervals」创建的执行规则是通过纯数字和时间单位来定义的(每隔多久执行)。

  • 比如我们要创建每隔 5 秒执行的规则,就可以在 Intervals 表名后面点击 Add 按钮:
创建每隔 5 秒执行的规则

(创建每隔 5 秒执行的规则)

  • 比如我们要创建每整分钟执行小时:分钟:00)的规则,就可以在 Crontabs 表名后面点击 Add 按钮:
创建每整分钟执行的规则

(创建每整分钟执行的规则)

根据实际需要通过上述两种方式定义完规则后,具体哪个任务要采取那种规则来执行,在「Periodic tasks」表中创建:

创建定时任务:绑定具体的任务函数和定时计划

(创建定时任务:绑定具体的任务函数和定时计划)

这里还支持很多配置项,比如参数传递等,具体可以尝试一下。

# 查看异步任务执行

# 运行 Django 工程

python manage.py runserver
1

# 启动 worker

# celery -A 你的工程名 worker -l info
celery -A django_celery_demo worker -l info
1
2

# 启动 beat

# celery -A 你的工程名 beat -l info
celery -A django_celery_demo beat -l info
1
2

# 观察定时任务执行

从 worker 的输出可以看到:

定时任务 - worker 输出信息

(定时任务 - worker 输出信息)

从 beat 的输出可以看到:

定时任务 - beat 输出信息

(定时任务 - beat 输出信息)

# 可视化监控 worker 状态

Celery 提供了⼀个工具 flower,将各个任务的执行情况、各个 worker 的健康状态进行监控并以可视化的方式展现。

安装:

pip install flower
1

启动 flower:

celery -A django_celery_demo flower --address=192.168.10.50 --port=5555 --basic_auth=user:password
1

查看 flower:

http://192.168.10.50:5555
1

界面如下:

flower 界面

(flower 界面)

# 使用 supervisor 进行管理

上面的 worker、beat 和 flower 都是在命令行前台启动的,虽然也可以改成后台启动,但是如果因为某个意外导致进程被杀掉了,那么整个服务就不可用了。因此实际生产中我们会使用 supervisor 进行管理。

supervisor 的配置文件(用到了虚拟环境,所以有些命令指向了虚拟环境中):

[program:celery-worker]
numprocs=1
user={your-username}
directory=/home/{your-username}/django_celery_demo/
command=venv/bin/celery -A django_celery_demo worker -l info -c 1000
redirect_stderr=true
autostart=true
autorestart=true
startretries=3
startsecs=3
stopwaitsecs=10
stopsignal=INT
stopasgroup=true
stdout_logfile=/var/log/supervisor/celery-worker.log
stderr_logfile=/var/log/supervisor/celery-worker.log

[program:celery-beat]
numprocs=1
user={your-username}
directory=/home/{your-username}/django_celery_demo/
command=venv/bin/celery -A django_celery_demo beat -l info -S django
redirect_stderr=true
autostart=true
autorestart=true
startretries=3
startsecs=3
stopwaitsecs=10
stopsignal=INT
stopasgroup=true
stdout_logfile=/var/log/supervisor/celery-beat.log
stderr_logfile=/var/log/supervisor/celery-beat.log

[program:celery-flower]
numprocs=1
user={your-username}
directory=/home/{your-username}/django_celery_demo/
command=venv/bin/celery -A django_celery_demo flower --address=127.0.0.1 --port=5555 --basic_auth=user:password
redirect_stderr=true
autostart=true
autorestart=true
startretries=3
startsecs=3
stopwaitsecs=10
stopsignal=INT
stopasgroup=true
stdout_logfile=/var/log/supervisor/celery-flower.log
stderr_logfile=/var/log/supervisor/celery-flower.log
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

需要注意不能用 root 用户去启动 celery 进程,否则可能会报错,出问题。

# 完整代码

完整代码我放到 GitHub 了,详见 django_celery_demo (opens new window)

# 参考文档

(完)