Python 最强大的任务调度框架 Celery!( 二 )


app = Celery(
"satori",
# 这里使用我服务器上的 Redis
# broker 用 1 号库, backend 用 2 号库
broker="redis://:maverick@82.157.146.194:6379/1",
backend="redis://:maverick@82.157.146.194:6379/2")
# 这里通过 @app.task 对函数进行装饰
# 那么之后我们便可调用 task.delay 创建一个任务
@app.task
def task(name, age):
print("准备执行任务啦")
time.sleep(3)
return f"name is {name}, age is {age}"

我们说执行任务的对象是 worker , 那么我们是不是需要创建一个 worker 呢?显然是需要的 , 而创建 worker 可以使用如下命令创建:

Python 最强大的任务调度框架 Celery!

文章插图
注意:在 5.0 之前我们可以写成 celery worker -A app ... , 也就是把所有的参数都放在子命令 celery worker 的后面 。但从 5.0 开始这种做法就不允许了 , 必须写成 celery -A app worker ... , 因为 -A 变成了一个全局参数 , 所以它不应该放在 worker 的后面 , 而是要放在 worker 的前面 。
下面执行该命令:
Python 最强大的任务调度框架 Celery!

文章插图
以上就前台启动了一个 worker , 正在等待从队列中获取任务 , 图中也显示了相应的信息 。然而此时队列中并没有任务 , 所以我们需要在另一个文件中创建任务并发送到队列里面去 。
import time
from app import task

 
# 从 app 导入 task, 创建任务, 但是注意: 不要直接调用 task
# 因为那样的话就在本地执行了, 我们的目的是将任务发送到队列里面去
# 然后让监听队列的 worker 从队列里面取任务并执行
# 而 task 被 @app.task 装饰, 所以它不再是原来的 task 了
# 我们需要调用它的 delay 方法

# 调用 delay 之后, 就会创建一个任务
# 然后发送到队列里面去, 也就是我们这里的 Redis
# 至于参数, 普通调用的时候怎么传, 在 delay 里面依旧怎么传
start = time.perf_counter()
task.delay("古明地觉", 17)
print(
time.perf_counter() - start
) # 0.11716766700000003

然后执行该文件 , 发现只用了 0.12 秒 , 而 task 里面明明 sleep 了 3 秒 。所以说明这一步是不会阻塞的 , 调用 task.delay 只是创建一个任务并发送至队列 。我们再看一下 worker 的输出信息:
Python 最强大的任务调度框架 Celery!

文章插图
可以看到任务已经被消费者接收并且消费了 , 而且调用 delay 方法是不会阻塞的 , 花费的那 0.12 秒是用在了其它地方 , 比如连接 Redis 发送任务等等 。
另外需要注意 , 函数被 @app.task 装饰之后 , 可以理解为它就变成了一个任务工厂 , 因为被装饰了嘛 , 然后调用任务工厂的 delay 方法即可创建任务并发送到队列里面 。我们也可以创建很多个任务工厂 , 但是这些任务工厂必须要让 worker 知道 , 否则不会生效 。所以如果修改了某个任务工厂、或者添加、删除了某个任务工厂 , 那么一定要让 worker 知道 , 而做法就是先停止 celery worker 进程 , 然后再重新启动 。
如果我们新建了一个任务工厂 , 然后在没有重启 worker 的情况下 , 就用调用它的 delay 方法创建任务、并发送到队列的话 , 那么会抛出一个 KeyError , 提示找不到相应的任务工厂 。
 
其实很好理解 , 因为代码已经加载到内存里面了 , 光修改了源文件而不重启是没用的 。因为加载到内存里面的还是原来的代码 , 不是修改过后的 。
 
然后我们再来看看 Redis 中存储的信息 , 1 号库用作 broker , 负责存储任务;2 号库用作 backend , 负责存储执行结果 。我们来看 2 号库:
Python 最强大的任务调度框架 Celery!


推荐阅读