Python Celery 教學 (2) - AsyncResult 與執行工作

Last updated on  Nov 22, 2023  in  Python 模組/套件推薦 , Python 程式設計 - 高階  by  Amo Chen  ‐ 4 min read

Python Celery 教學 (1) - 介紹與初次使用 介紹中,我們認識 Celery 幾個重要名詞與初步小小體驗使用 Celery 的過程。

本篇將會進一步介紹更多 Celery 的相關知識與使用方法,其中包括:

  • 淺談 AsyncResult
  • 深入淺出 Calling Tasks

淺談 AsyncResult

上一篇談到非同步執行 Task 後,會得到 1 個 AsyncResult , AsyncResult 除了儲存 Task 的 ID , 執行狀態及執行結果(Task 的回傳值)之外,也封裝若干 Task 的相關操作,例如 revoke 取消執行 Task , get 取得執行結果等等,詳見 AsyncResult

同樣以上一篇的程式碼為例,在不執行 Worker 的情況下,先試著 import Task 並執行它取得 AsyncResult

tasks.py

# -*- coding: utf-8 -*-
from celery import Celery


app = Celery(
    'tasks',
    broker='redis://127.0.0.1:6379/0',
    backend='redis://127.0.0.1:6379/1',
)


@app.task
def say_hello():
    print('Hello')
$ python
>>> from tasks import say_hello
>>> r = say_hello.delay()
>>> r
<AsyncResult: e7241224-a970-41c5-83e4-20f30a314f09>
>>> r.task_id
'e7241224-a970-41c5-83e4-20f30a314f09'

首先我們取得 AsyncResult 並指派給變數 r ,並查看其 task_id 屬性,由此可知 <AsyncResult: e7241224-a970-41c5-83e4-20f30a314f09> 中的 e7241224-a970-41c5-83e4-20f30a314f09 就是 Task ID 。

每個 Task 的執行都會有相對應的 Task ID ,要取得 Task 的執行狀態、執行結果都需要透過 Task ID ,這是相當重要的一個屬性。

接著看看變數 r 的執行狀態:

>>> r.state
'PENDING'

由於目前並沒有任何 Worker 可以執行 Task ,因此 Task 的狀態是 PENDING 等待執行。更多狀態詳見 文件

如果此時啟動 Worker ,接下來再查看 r.state 就會發現狀態變了,這是因為 Worker 執行時變更狀態的緣故。

深入淺出 Calling Tasks

定義好一個 Task 之後,該 Task 支援 3 種呼叫方式:

  1. calling (__call__)

其實就是直接呼叫,以 say_hello Task 舉例的話就是 say_hello() 這樣直接呼叫,該 Task 就不會被 Worker 執行,需要手動執行 Task 時,就會覺得這個設計很方便。

  1. delay(*args, **kwargs)

也就是先前範例 say_hello.delay() 所使用的呼叫方式,毫無疑問這種呼叫方式會被 Worker 執行,此種呼叫方式也支援參數帶入,假設以下 Task 需要參數,那其呼叫方式會是 say('Hi', name='Robert')

>>> @app.task
... def say(x, name=None):
...     return name, 'says', x
  1. apply_async(args[, kwargs[, …]])

此種呼叫方式與 delay(*args, **kwargs) 大同小異,不同的地方在於 apply_async 支援 execution options ,也就是額外的執行選項,例如 countdown (倒數後才執行)、 expires (Task Message 一發布就開始算時間,超過時間就視為過期) 、 retry (連線失敗是否重試)等等,詳見 文件

假設要讓 say Task 3 秒後再執行,就可以使用 apply_async 改寫:

>>> say.apply_async('Hi', {name: 'Rebort'}, countdown=3)

再來談談當 Task Message 要發送到 Worker 時,資料究竟用什麼格式傳遞的問題,所以需要了解 Celery Serializers :

Data transferred between clients and workers needs to be serialized, so every message in Celery has a content_type header that describes the serialization method used to encode it. The default serializer is JSON, but you can change this using the task_serializer setting, or for each individual task, or even per message.

目前 Celery 支援以下 4 種序列化的方式:

  1. json
  2. pickle
  3. yaml
  4. msgpack

預設的資料序列化是用 JSON 格式進行傳遞,所以要特別注意有些資料型態如果無法被轉化成 JSON 格式的話,就會造成無法執行的問題,設定序列化的範例為:

>>> add.apply_async((10, 10), serializer='json')

除了序列化之外, Celery 也支援 Task Message 的壓縮,如果有壓縮需求的話詳見 Compression

Signatures

初步了解 Celery Calling Task 的奧妙之後,接下來談談 Celery 提供 Signature 功能, Signature 讓我們可以把各式各樣的 Task 像組合樂高一樣結合。

很多時候 Task 其實只需要直接 delayapply_async 一下就好了,但有些更複雜的需求,會需要結合多種 Task 進行,例如我們希望做完 Task1 後,再將結果放入 Task2 作為參數之一並執行 Task2 :

Task2(Task1(arg1), arg2)

這時候 Signatures 就是負責幫忙把 Task1(arg1) 包裝成可以序列化傳遞的重要角色。

A signature() wraps the arguments, keyword arguments, and execution options of a single task invocation in a way such that it can be passed to functions or even serialized and sent across the wire.

將 Task 包裝成 Signature 很簡單,舉先前的 say Task 為例,只要用 signatures 就可以:

>>> say.signature(args=('Hi',), kwargs={'name': 'Robert'})
tasks.say('Hi', kwargs={'name': 'Robert'})

>>> say.s('Hi', name='Robert')
tasks.say('Hi', name='Robert')

>>> type(say.signature(args=('Hi',), kwargs={'name': 'Robert'}))
<class 'celery.canvas.Signature'>

上述範例也可以看到呼叫 signature 會回傳一個 Signature 實例。另外值得注意的是此時 Task 並不會被執行。

那 Task 被包裝成 Signature 之後要如何執行呢?一樣透過剛剛介紹 3 種 Calling Task 的方式就可以執行了:

>>> say.s('Hi', name='Robert')()
<AsyncResult: a8fcff23-ec89-4cd6-b846-26c6ea9642fa>
>>> say.s('Hi', name='Robert').delay()
<AsyncResult: 999af414-31db-461d-a7e8-dca737dd337f>
>>> say.s('Hi', name='Robert').apply_async()
<AsyncResult: f2334d7b-761e-4909-bec7-38c3c83029a4>

認識 Signature 之後,再來介紹 Signature 1 個重要特性 Partials 。

Signature 支援將 Task 與部分參數一起包裝成 Signature ,例如以下 Task ,可以只寫成 add.s(4)

@app.task
def add(x, y):
    print('x={}, y={}'.format(x, y))
    return x + y

因為此時還沒執行,所以可以在執行前補足所需要的參數:

>>> sig = add.s(4)
>>> sig(4)
8
>>> sig.delay(4)
<AsyncResult: 87cefc29-b84c-4982-be99-fe477b8c689e>

Partials 的特性很適合當作 Callback 使用,例如先把 Task1 轉成 Signature 之後,丟進另一個 Task2 ,等到 Task2 執行完之後產生 Task1 所需要的完整參數,再執行 Task1 。

Callbacks

要如何將 Task 轉成 Signature 後再變成另一個 Task 的 Callback 呢?可以使用在呼叫 Task 時額外指定 link 參數,然後將 Task 轉成 Signature 後放到 link 參數,例如:

>>> r = add.apply_async((2, 2), link=add.s(16))

如此一來就會看到 Worker 印出以下結果:

[2018-07-24 19:48:12,237: WARNING/ForkPoolWorker-2] x=2, y=2
[2018-07-24 19:48:12,237: WARNING/ForkPoolWorker-1] x=4, y=16

上述範例神奇的地方在於, link=add.s(16) 明明是 Partial 的 Signature ,為何能正確執行?這是由於預設的 Callback 執行方式,會將先前執行的 Task 執行結果當作 Callback 的參數,再呼叫 Callback ,所以 2 + 2 得到 4 之後,再將 4 視為 Callback 所需的參數之一,放進 add.s(16) 中執行。

The callback task will be applied with the result of the parent task as a partial argument

此外,被 link 的 Task 稱為 Child Task , link 別人的 Task 則是 Parent Task 。

那麼 r 的結果是 20 還是 4 ? 答案是:

>>> r.result
4

這是由於 link 僅是 Callback ,因此其結果並不會作為最終結果回傳,所以 r 的結果仍然是 2 + 2 = 4 。

那要怎麼取得 add.s(16) 的執行結果?可以透過 r.children 屬性取得 Callback 的執行結果。

>>> r.children
[<AsyncResult: 51b01a91-f151-47de-b414-38e72fd75337>]
>>> r.children[0].result
20

Celery 除了 link 參數之外,也提供 link_error 作為發生 error 時的 Callback 使用,詳情請見 Linking (callbacks/errbacks)

Python Celery 教學 (3) - Workflow

References

http://docs.celeryproject.org/en/latest/userguide/calling.html

FOLLOW US

對抗久坐職業傷害

研究指出每天增加 2 小時坐著的時間,會增加大腸癌、心臟疾病、肺癌的風險,也造成肩頸、腰背疼痛等常見問題。

然而對抗這些問題,卻只需要工作時定期休息跟伸展身體即可!

你想輕鬆改變現狀嗎?試試看我們的 PomodoRoll 番茄鐘吧! PomodoRoll 番茄鐘會根據你所設定的專注時間,定期建議你 1 項辦公族適用的伸展運動,幫助你打敗久坐所帶來的傷害!

贊助我們的創作

看完這篇文章了嗎? 休息一下,喝杯咖啡吧!

如果你覺得 MyApollo 有讓你獲得實用的資訊,希望能看到更多的技術分享,邀請你贊助我們一杯咖啡,讓我們有更多的動力與精力繼續提供高品質的文章,感謝你的支持!