Python 最强大的任务调度框架 Celery!( 五 )


"routing_key": "default"
},
# 凡是 topic 开头的 routing key
# 都会被放到这个队列
"topicqueue": {
"routing_key": "topic.#",
"exchange": "topic_exchange",
"exchange_type": "topic",
},
"task_eeg": { # 设置扇形交换机
"exchange": "tasks",
"exchange_type": "fanout",
"binding_key": "tasks",
}, 
celery 的配置非常多 , 不止我们上面说的那些 , 更多配置可以查看官网 , 写的比较详细 。
 

https://docs.celeryq.dev/en/latest/userguide/configuration.html#general-settings
 
值得一提的是 , 在 5.0 之前配置项都是大写的 , 而从 5.0 开始配置项改成小写了 。不过老的写法目前仍然支持 , 只是启动的时候会抛警告 , 并且在 6.0 的时候不再兼容老的写法 。
Python 最强大的任务调度框架 Celery!

文章插图
官网也很贴心地将老版本的配置和新版本的配置罗列了出来 , 尽管配置有很多 , 但并不是每一个都要用 , 可以根据自身的业务合理选择 。
然后下面我们就根据配置文件的方式启动 celery , 当前目录结构如下:
Python 最强大的任务调度框架 Celery!

文章插图
celery_demo/config.py
broker_url = "redis://:maverick@82.157.146.194:6379/1"
result_backend = "redis://:maverick@82.157.146.194:6379"
# 写俩就完事了

celery_demo/tasks/task1.py
celery 可以支持非常多的定时任务 , 而不同种类的定时任务我们一般都会写在不同的模块中(当然这里目前只有一个) , 然后再将这些模块组织在一个单独的目录中 。
当前只有一个 task1.py , 我们随便往里面写点东西 , 当然你也可以创建更多的文件 。
def add(x, y):
return x + y

 
def sub(x, y):
return x - y

def mul(x, y):
return x * y

def div(x, y):
return x / y

celery_demo/app.py
from celery import Celery
import config
from tasks.task1 import (
add, sub, mul, div

 
# 指定一个 name 即可
app = Celery("satori")
# 其它参数通过加载配置文件的方式指定
# 和 flask 非常类似
app.config_from_object(config)

# 创建任务工厂 , 有了任务工厂才能创建任务
# 这种方式和装饰器的方式是等价的
add = app.task(add)
sub = app.task(sub)
mul = app.task(mul)
div = app.task(div)

然后重新启动 worker:
Python 最强大的任务调度框架 Celery!

文章插图
输出结果显示 , 任务工厂都已经被加载进来了 , 然后我们创建任务并发送至队列 。
# 在 celery_demo 目录下
# 将 app.py 里面的任务工厂导入进来
>>> from app import add, sub, mul, div
# 然后创建任务发送至队列 , 并等待结果
>>> add.delay(3, 4).get()
7
>>> sub.delay(3, 4).get()
-1
>>> mul.delay(3, 4).get()
12
>>> div.delay(3, 4).get()
0.75

结果正常返回了 , 再来看看 worker 的输出 , 
Python 最强大的任务调度框架 Celery!

文章插图
多个任务都被执行了 。
发送任务时指定参数
我们在发送任务到队列的时候 , 使用的是 delay 方法 , 里面直接传递函数所需的参数即可 , 那么除了函数需要的参数之外 , 还有没有其它参数呢?
首先 delay 方法实际上是调用的 apply_async 方法 , 并且 delay 方法里面只接收函数的参数 , 但是 apply_async 接收的参数就很多了 , 我们先来看看它们的函数原型:


推荐阅读