Redis中如何配置Celery
今天就跟大家聊聊有关Redis中如何配置Celery,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。
以下撇开Web框架,介绍基于Redis配置Celery任务的方法。
pip install celery[redis]
项目结构
$ tree your_project your_project ├── __init__.py ├── main.py ├── celery.py └── tasks.py 0 directories, 4 files
其中,main.py是触发Task的业务代码。当然,文件名可以随意改。celery.py是Celery的app定义的位置,tasks.py是Task定义的位置,文件名不建议修改。
配置Celery
在celery.py中写入如下代码:
from celery import Celery from .settings import REDIS_URL APP = Celery( main=__package__, broker=REDIS_URL, backend=REDIS_URL, include=[f'{__package__}.tasks'], ) APP.conf.update(task_track_started=True)
其中,REDIS_URL从同一的配置settings.py中引入, 形式大概是redis://localhost:6379/0。这里既用Redis来当broker,又用来当backend。即,既当消息队列,又当结果反馈的数据库(默认仅保存1天)。
在include=,需要填一个下游worker的包名列表。这里选择了同一个包的tasks.py文件。
额外设置的task_track_started,是命令Worker反馈STARTED状态。默认情况下,是无法知道任务什么时候开始执行的。
编写任务并调用
在tasks.py文件中,添加异步任务的实现。
from .celery import APP @APP.task def do_sth(): pass
在需要发起任务的地方,用.apply_async可以触发异步调用。即,实际只是向消息队列发送消息,真正的执行操作在远程。
from celery.result import AsyncResult from .tasks imprt do_sth result = do_sth.apply_async() assert isinstance(result, AsyncResult)
运行Worker:
celery -A your_project worker
运行原理
一次Task从触发到完成,序列图如下:
其中,main代表业务代码主进程。它可能是Django、Flask这类Web服务,也可能是一个其它类型的进程。worker就是指Celery的Worker。
main发送消息后,会得到一个AsyncResult,其中包含task_id。仅通过task_id,也可以自己构造一个AsyncResult,查询相关信息。其中,代表运行过程的,主要是state。
worker会持续保持对Redis(或其它消息队列,如RabbitMQ)的关注,查询新的消息。如果获得新消息,将其消费后,开始运行do_sth。运行完成会把返回值对应的结果,以及一些运行信息,回写到Redis(或其它backend,如Django数据库等)上。在系统的任何地方,通过对应的AsyncResult(task_id)就可以查询到结果。
Celery Task的状态
以下是状态图:
其中,除SUCCESS外,还有失败(FAILURE)、取消(REVOKED)两个结束状态。而RETRY则是在设置了重试机制后,进入的临时等待状态。
另外,如果保存在Redis的结果信息被清理(默认仅保存1天),那么任务状态又会变成PENDING。这在设计上是个巨大的问题,使用时要做对应容错。
常见控制操作
result = AsyncResult(task_id) # 阻塞等待返回 result.wait() # 取消任务 result.revoke() # 删除任务记录 result.forget()
有时,在业务主进程中需要等待异步运行的结果,这时需要使用wait。如果要取消一个排队中、或已执行的任务,则可以使用revoke。即使任务已经执行完成,也可以使用revoke,但不会有任何变化。如果需要提前删除任务记录,可以使用forget。
看完上述内容,你们对Redis中如何配置Celery有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注行业资讯频道,感谢大家的支持。