Python 最强大的任务调度框架 Celery!( 十 )


文章插图
显然启动和关闭是没有问题的 , 不过为了更好地观察到输出 , 我们还是用之前的方式 , 选择前台启动 。
然后回顾一下 celery 的架构 , 里面除了 producer 之外还有一个 celery beat , 也就是调度器 。我们调用任务工厂的 delay 方法 , 手动将任务发送到队列 , 此时就相当于 producer 。如果是设置定时任务 , 那么会由调度器自动将任务添加到队列 。
我们在 tasks 目录里面再创建一个 period_task1.py 文件 。
# celery_demo/tasks/period_task1.py
from celery.schedules import crontab
from app import app
from .task1 import task1, task2, task3, task4

 
@app.on_after_configure.connect
def period_task(sender, **kwargs):
# 第一个参数为 schedule , 可以是 float , 或者 crontab
# crontab 后面会说 , 第二个参数是任务 , 第三个参数是名字
sender.add_periodic_task(10.0, task1.s(),
name="每10秒执行一次")
sender.add_periodic_task(15.0, task2.s("task2"),
name="每15秒执行一次")
sender.add_periodic_task(20.0, task3.s(),
name="每20秒执行一次")
sender.add_periodic_task(
crontab(hour=18, minute=5, day_of_week=0),
task4.s("task4"),
name="每个星期天的18:05运行一次"
)

# celery_demo/tasks/task1.py
from app import app

@app.task
def task1():
print("我是task1")
return "task1你好"

@app.task
def task2(name):
print(f"我是{name}")
return f"{name}你好"

@app.task
def task3():
print("我是task3")
return "task3你好"

@app.task
def task4(name):
print(f"我是{name}")
return f"{name}你好"

既然使用了定时任务 , 那么一定要设置时区 。
# celery_demo/config.py
broker_url = "redis://:maverick@82.157.146.194:6379/1"
result_backend = "redis://:maverick@82.157.146.194:6379/2"
# 之前说过 , celery 默认使用 utc 时间
# 其实我们是可以手动禁用的 , 然后手动指定时区
enable_utc = False
timezone = "Asia/Shanghai"

最后是修改 app.py , 将定时任务加进去 。
from celery import Celery
import config

 
app = Celery(
__name__,
include=["tasks.task1", "tasks.period_task1"])
app.config_from_object(config)

下面就来启动任务 , 先来启动 worker , 生产上应该后台启动 , 这里为了看到信息 , 选择前台启动 。

Python 最强大的任务调度框架 Celery!

文章插图
tasks.task1 里面的 4 个任务工厂都被添加进来了 , 然后再来启动调度器 。
Python 最强大的任务调度框架 Celery!

文章插图
调度器启动之后会自动检测定时任务 , 如果到时间了 , 就发送到队列 。而启动调度器的命令如下:
Python 最强大的任务调度框架 Celery!

文章插图
根据调度器的输出内容 , 我们知道定时任务执行完了 , 但很明显定时任务本质上也是任务 , 只不过有定时功能 , 但也要发到队列里面 。然后 worker 从队列里面取出任务 , 并执行 , 那么 worker 必然会有信息输出 。
Python 最强大的任务调度框架 Celery!

文章插图
调度器启动到现在已经有一段时间了 , worker 在终端中输出了非常多的信息 。
此时我们就成功实现了定时任务 , 并且是通过定义函数、打上装饰器的方式实现的 。除此之外 , 我们还可以通过配置的方式实现 。


推荐阅读