Python Celery 教學 (3) - Workflow

Last updated on  Nov 22, 2023  in  Python 模組/套件推薦 , Python 程式設計 - 高階  by  Amo Chen  ‐ 2 min read

本篇將延續 Python Celery 教學 (2) - AsyncResult 與執行工作 的內容,進一步介紹 2 種常見 Celery 定義的 Workflow:

  • chain
  • group

Celery 在 Signature 的基礎上提供了若干種內建的 Workflow ,例如 chain , group 等等,所以能夠發現以下的 Workflow 都會用 Signature 進行傳遞。

chain

熟悉使用 Linux 指令的朋友們一定都很熟悉 pipe ( | ) 的用法, pipe 可以將一個指令的輸出結果,直接輸入至下一個指令處理。而這種連結的方式在 Celery 稱為 chains ,所以可以用 chain 方法將多個 tasks 連結起來,一個接著一個地執行。

以下範例定義簡單的加法與乘法 tasks :

# -*- coding: utf-8 -*-
from __future__ import absolute_import
from celery import Celery


app = Celery(
    'tasks',
    broker='redis://127.0.0.1:6379/0',
    backend='redis://127.0.0.1:6379/1',
)


@app.task
def add(x, y):
    print('x={}, y={}, x+y={}'.format(x, y, x+y))
    return x+y


@app.task
def mul(x, y):
    print('x={}, y={}, x*y={}'.format(x, y, x*y))
    return x*y


if __name__ == '__main__':
    app.worker_main()

我們可以利用 chain 做出類似以下 Linux pipe 的效果:

add -x 2 -y 2 | mul -x 10
# 40

以下就是用 chain 將 tasks 連結起來的範例:

>>> from celery import chain
>>> from tasks import add, mul
>>> res = chain(add.s(2, 2), mul.s(10))
>>> type(res)
<class 'celery.canvas._chain'>
>>> res
tasks.add(2, 2) | mul(10)
>>> res().get()
40

上述範例可以看到 chain 會回傳 celery.canvas._chain 類別,如果把它簡單印出來就會發現 tasks.add(2, 2) | mul(10) 是跟 pipe 很像的表示方法,所以上述範例還可以改寫成:

>>> res = (add.s(2, 2) | mul.s(10)).apply_async()
>>> res.get()
40

接著呼叫 res() 執行之後,再用 get() 方法取得最後結果。

group

group 提供將多個 tasks 平行執行的功能。

A group can be used to execute several tasks in parallel.

group 很適合相同工作但是不同參數的 tasks ,例如縮圖功能就很適合應用在 group ,當使用者上傳圖片後,平行將圖片針對各種裝置(手機、桌機)轉成多種不同大小尺寸。

例如以下假想的縮圖 task 範例:

@app.task
def resize_photo(photo, w, h):
    print('w={}, h={}'.format(w, h))

接著利用 group 同時產生多種不同尺寸的圖片的程式碼就如以下,用 group 包裝後再呼叫 apply_async() 方法,交由 Workers 執行:

>>> from celery import group
>>> from tasks import resize_photo
>>> photo = 'fake_photo'
>>> jobs = group([
...    resize_photo.s(photo, 80, 80),
...    resize_photo.s(photo, 160, 160),
... ])
>>> res = jobs.apply_async()
<GroupResult: 4d6c76f8-cdcb-42db-9fa3-8440698fc85f [62a7049e-3300-44f8-886c-e4ded65f6265, c2fd58b4-91bf-4406-9ca6-9d10634595c2]>

同樣地,執行後會取得 GroupResult object ,此 object 也提供若干方法讓我們能夠知道 tasks 的執行情況,例如:

>>> res.ready() # 是否所有 tasks 都已執行完畢
False
>>> res.successful() # 是否所有 tasks 都執行成功
False
>>> res.waiting() # 是否有任ㄧ tasks 正在等待執行
True
>>> res.completed_count() # 取得完成執行的 tasks 數量
0

這邊需要注意的是預設 serializer 是 JSON ,所以傳遞參數到 tasks 時,必須是 JSON serializable 的物件才行,否則會造成類似以下的 EncodeError

kombu.exceptions.EncodeError: Object of type 'object' is not JSON serializable

小結

除了本篇介紹的 chaingroup 之外, Celery 還提供了另外幾種 Workflow :

  • chord
  • map
  • starmap
  • chunk

有興趣的話可以進一步參閱 文件 ,說不定可以意外發現適合的 Workflow 解決長久以來的困擾。

Python Celery 教學 (4) - Queues

References

http://docs.celeryproject.org/en/latest/userguide/canvais.html

對抗久坐職業傷害

研究指出每天增加 2 小時坐著的時間,會增加大腸癌、心臟疾病、肺癌的風險,也造成肩頸、腰背疼痛等常見問題。

然而對抗這些問題,卻只需要工作時定期休息跟伸展身體即可!

你想輕鬆改變現狀嗎?試試看我們的 PomodoRoll 番茄鐘吧! PomodoRoll 番茄鐘會根據你所設定的專注時間,定期建議你 1 項辦公族適用的伸展運動,幫助你打敗久坐所帶來的傷害!

贊助我們的創作

看完這篇文章了嗎? 休息一下,喝杯咖啡吧!

如果你覺得 MyApollo 有讓你獲得實用的資訊,希望能看到更多的技術分享,邀請你贊助我們一杯咖啡,讓我們有更多的動力與精力繼續提供高品質的文章,感謝你的支持!