目 录CONTENT

文章目录

Celery初探

phyger
2022-06-16 / 0 评论 / 1 点赞 / 571 阅读 / 1,798 字 / 正在检测是否收录...

1、前言

在日常的开发工作中,当我们的Api中有一个动作需要很长时间才能完成的时候,我们就可以将这个动作作为一个任务交给Celery去异步执行,执行完再将结果返回给用户。在这个过程中Celery就充当了一个任务调度的角色。

以上就是Celery的一个典型使用场景,Celery是一个基于分布式消息的任务队列,支持多种并发方式。

2、快速开始

2.1、创建虚拟环境

mkdir celeryy && cd celeryy
pdm init

2.2、安装Celery

pdm add celery

2.3、安装eventlet(windows)

pip install eventlet

2.4、启动Redis

cd redis
.\redis-server.exe redis.windows.conf

2.5、编写task文件

# celeryy/celery_task.py
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0',backend='redis://localhost:6379/1')
@app.task
def add(x, y):
    return x + y

2.6、启动Celery应用

windows需要依赖eventlet模块启动。

cd celeryy
celery -A celery_task worker --loglevel=info -P eventlet

2.7、调用task

# celeryy/run.py
from celery_task import add
import time

t1 = time.time()

r1 = add.delay(1, 2)
r2 = add.delay(2, 4)
r3 = add.delay(3, 6)
r4 = add.delay(4, 8)
r5 = add.delay(5, 10)

r_list = [r1, r2, r3, r4, r5]
for r in r_list:
    while not r.ready():
        pass
    print(r.result)

t2 = time.time()

print('共耗时:%s' % str(t2-t1))

代码执行结果:

E:\celeryy>python run.py
3
6
9
12
15
共耗时:8.324045658111572

3、Celery原理分析

Celery原理图

简单来说,Celery就是一个分布式的任务调度和执行器。即当我们有一些耗时的任务,我们可以将这些任务托管给Celery,然后需要调用的时候,直接在业务代码中导入这个被Celery托管的任务,然后利用这些任务自带的delay方法对齐进行调用(生产者),然后这些任务就会被Celery通过broker(Redis等消息代理)通知到Worker(消费者)进行执行,最后任务的执行结果会被保存到backend中去。

需要注意的是,当我们的业务接口A执行了创建了Celery的异步任务的同时,给任务一个默认的状态。后续我们可以根据任务的key去查询任务的状态。

3.1、任务状态查询

上面的代码中,我们使用了ready方法去检查任务状态。但是任务的执行结果都是会被保存到backend中的,我们可以登录到backend中去查看任务的状态。

可以通过delay的返回值对象的id属性去获取当前task的id。

r = add.delay(1,2)
print(r.id)

根据task的id去Redis(backend)中去查询这个task的执行结果:

127.0.0.1:6379[1]> get celery-task-meta-af1f9d6b-73d9-43f9-aa99-35a6918554a4
"{\"status\": \"SUCCESS\", \"result\": 15, \"traceback\": null, \"children\": [], \"date_done\": \"2022-06-15T09:24:48.470345\", \"task_id\": \"af1f9d6b-73d9-43f9-aa99-35a6918554a4\"}"

如此,我们就完整的使用Celery达到了任务异步执行以及结果刷新的目标。

1

评论区