2 Celery 执行异步任务
2.1 基本使用
2.1.1 创建异步任务文件
首先创建异步任务执行文件 celery_task.py:
python
import time
import celery
backend = "redis://localhost:6379/1"
broker = "redis://localhost:6379/2"
cel = celery.Celery("tasks", broker=broker, backend=backend)
# @cel.task 把一个普通函数,变成了一个 Celery 异步函数,让它可以被 .delay() / .apply_async() 丢到队列里由 worker 执行。
@cel.task
def send_email(name):
print(f"向 {name} 发送邮件...")
time.sleep(5) # 模拟发送邮件的耗时操作
print(f"邮件已发送给 {name}")
return "OK"
@cel.task
def send_msg(name):
print(f"向 {name} 发送消息...")
time.sleep(5) # 模拟发送消息的耗时操作
print(f"消息已发送给 {name}")
return "OK"2.1.2 创建任务执行文件
创建执行任务文件 produce_task.py:
python
from .celery_task import send_email, send_msg
result_email = send_email.delay("小明")
print(f"邮件任务 ID:{result_email.id}")
result_msg = send_msg.delay("小红")
print(f"消息任务 ID:{result_msg.id}")使用 .delay() 方法执行来创建队列,把任务交给中间件。
查看返回的对象类型:
python
print(type(result_email))
# <class 'celery.result.AsyncResult'>
print(result_email)
# 62f110e9-c4c3-44b2-bcff-fe8024e29960 (一个 UUID)说明:print() 自动调用的是 print(str(x)),而 AsyncResult 类的 __str__ 方法被重写为返回 task_id。
更精确地说:
send_email.delay("小明")返回的是一个AsyncResult对象- 这个对象的
__str__方法直接返回任务 ID - 所以输出的是那串 UUID,也就是
task_id
2.1.3 启动 Celery Worker
注意,异步任务需要启动 Celery worker 来执行:
bash
celery -A docs.project.celery.celery_task worker -l info2.1.4 查看任务执行结果
创建 result.py 文件来查看任务执行结果:
python
from celery.result import AsyncResult
from celery_task import cel
async_result = AsyncResult(id="c6ddd5b7-a662-4f0e-93d4-ab69ec2aea5d", app=cel)
if async_result.successful():
result = async_result.get()
print(result)
# result.forget() # 将结果删除
elif async_result.failed():
print("执行失败")
elif async_result.status == "PENDING":
print("任务等待中被执行")
elif async_result.status == "RETRY":
print("任务异常后正在重试")
elif async_result.status == "STARTED":
print("任务已经开始被执行")2.2 多任务结构
2.2.1 主配置文件 celery.py
python
from celery import Celery
cel = Celery(
"celery_demo",
broker="redis://127.0.0.1:6379/1",
backend="redis://127.0.0.1:6379/2",
# 包含以下两个任务文件,去相应的py文件中找任务,对多个任务做分类
include=["celery_tasks.task01", "celery_tasks.task02"],
)
# 时区
cel.conf.timezone = "Asia/Shanghai"
# 是否使用UTC
cel.conf.enable_utc = False2.2.2 任务文件
task01.py:
python
import time
from celery_tasks.celery import cel
@cel.task
def send_email(res):
time.sleep(5)
return f"完成向{res}发送邮件任务"task02.py:
python
import time
from celery_tasks.celery import cel
@cel.task
def send_msg(name):
time.sleep(5)
return f"完成向{name}发送短信任务"2.2.3 任务调用文件 produce_task.py
python
from celery_tasks.task01 import send_email
from celery_tasks.task02 import send_msg
# 立即告知celery去执行test_celery任务,并传入一个参数
result = send_email.delay("yuan")
print(result.id)
result = send_msg.delay("yuan")
print(result.id)2.2.4 结果检查文件 check_result.py
python
from celery.result import AsyncResult
from celery_tasks.celery import cel
async_result = AsyncResult(id="562834c6-e4be-46d2-908a-b102adbbf390", app=cel)
if async_result.successful():
result = async_result.get()
print(result)
# result.forget() # 将结果删除,执行完成,结果不会自动删除
# async.revoke(terminate=True) # 无论现在是什么时候,都要终止
# async.revoke(terminate=False) # 如果任务还没有开始执行呢,那么就可以终止。
elif async_result.failed():
print("执行失败")
elif async_result.status == "PENDING":
print("任务等待中被执行")
elif async_result.status == "RETRY":
print("任务异常后正在重试")
elif async_result.status == "STARTED":
print("任务已经开始被执行")2.2.5 启动 Worker
bash
celery worker -A celery_task -l info -P eventlet然后可以:
- 添加任务(执行
produce_task.py) - 检查任务执行结果(执行
check_result.py)