Python Celery 教學 (3) - Workflow
Last updated on Nov 22, 2023 in Python 模組/套件推薦 , Python 程式設計 - 高階 by Amo Chen ‐ 2 min read
本篇將延續 Python Celery 教學 (2) - AsyncResult 與執行工作 的內容,進一步介紹 2 種常見 Celery 定義的 Workflow:
- chain
- group
Celery 在 Signature 的基礎上提供了若干種內建的 Workflow ,例如 chain
, group
等等,所以能夠發現以下的 Workflow 都會用 Signature 進行傳遞。
chain
熟悉使用 Linux 指令的朋友們一定都很熟悉 pipe ( |
) 的用法, pipe 可以將一個指令的輸出結果,直接輸入至下一個指令處理。而這種連結的方式在 Celery 稱為 chains
,所以可以用 chain
方法將多個 tasks 連結起來,一個接著一個地執行。
以下範例定義簡單的加法與乘法 tasks :
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from celery import Celery
app = Celery(
'tasks',
broker='redis://127.0.0.1:6379/0',
backend='redis://127.0.0.1:6379/1',
)
@app.task
def add(x, y):
print('x={}, y={}, x+y={}'.format(x, y, x+y))
return x+y
@app.task
def mul(x, y):
print('x={}, y={}, x*y={}'.format(x, y, x*y))
return x*y
if __name__ == '__main__':
app.worker_main()
我們可以利用 chain
做出類似以下 Linux pipe 的效果:
add -x 2 -y 2 | mul -x 10
# 40
以下就是用 chain
將 tasks 連結起來的範例:
>>> from celery import chain
>>> from tasks import add, mul
>>> res = chain(add.s(2, 2), mul.s(10))
>>> type(res)
<class 'celery.canvas._chain'>
>>> res
tasks.add(2, 2) | mul(10)
>>> res().get()
40
上述範例可以看到 chain 會回傳 celery.canvas._chain
類別,如果把它簡單印出來就會發現 tasks.add(2, 2) | mul(10)
是跟 pipe 很像的表示方法,所以上述範例還可以改寫成:
>>> res = (add.s(2, 2) | mul.s(10)).apply_async()
>>> res.get()
40
接著呼叫 res()
執行之後,再用 get()
方法取得最後結果。
group
group
提供將多個 tasks 平行執行的功能。
A group can be used to execute several tasks in parallel.
group
很適合相同工作但是不同參數的 tasks ,例如縮圖功能就很適合應用在 group ,當使用者上傳圖片後,平行將圖片針對各種裝置(手機、桌機)轉成多種不同大小尺寸。
例如以下假想的縮圖 task 範例:
@app.task
def resize_photo(photo, w, h):
print('w={}, h={}'.format(w, h))
接著利用 group
同時產生多種不同尺寸的圖片的程式碼就如以下,用 group
包裝後再呼叫 apply_async()
方法,交由 Workers 執行:
>>> from celery import group
>>> from tasks import resize_photo
>>> photo = 'fake_photo'
>>> jobs = group([
... resize_photo.s(photo, 80, 80),
... resize_photo.s(photo, 160, 160),
... ])
>>> res = jobs.apply_async()
<GroupResult: 4d6c76f8-cdcb-42db-9fa3-8440698fc85f [62a7049e-3300-44f8-886c-e4ded65f6265, c2fd58b4-91bf-4406-9ca6-9d10634595c2]>
同樣地,執行後會取得 GroupResult
object ,此 object 也提供若干方法讓我們能夠知道 tasks 的執行情況,例如:
>>> res.ready() # 是否所有 tasks 都已執行完畢
False
>>> res.successful() # 是否所有 tasks 都執行成功
False
>>> res.waiting() # 是否有任ㄧ tasks 正在等待執行
True
>>> res.completed_count() # 取得完成執行的 tasks 數量
0
這邊需要注意的是預設 serializer 是 JSON ,所以傳遞參數到 tasks 時,必須是 JSON serializable 的物件才行,否則會造成類似以下的 EncodeError
:
kombu.exceptions.EncodeError: Object of type 'object' is not JSON serializable
小結
除了本篇介紹的 chain
與 group
之外, Celery 還提供了另外幾種 Workflow :
- chord
- map
- starmap
- chunk
有興趣的話可以進一步參閱 文件 ,說不定可以意外發現適合的 Workflow 解決長久以來的困擾。
References
http://docs.celeryproject.org/en/latest/userguide/canvais.html