Python Celery 教學 (1) - 介紹與初次使用

Last updated on  Nov 22, 2023  in  Python 模組/套件推薦 , Python 程式設計 - 高階  by  Amo Chen  ‐ 4 min read

Celery 是一套頗成熟的 Distributed Task Queue 解決方案,讓我們可以非同步(asynchronous)執行任務(tasks) / 工作(jobs) ,這種非同步的作法很常見於 Web Application 。舉個例子說明適合使用 Celery 的情況也許會更清楚,假設有一使用者需要透過 Web Application 匯出一份龐大的資料(可能執行時間很長,超過 30 分鐘),在這情況之下,我們也無法要求使用者一直開著該網頁不關,這時候比較好的作法就是利用非同步的方式執行匯出資料的工作,把工作移到背景執行,然後告知使用者工作執行完畢後會透過 Email / 即時通訊軟體通知工作完成,讓使用者可以回來下載資料,如此一來,使用者就不需要一直開著網頁佔用伺服器的連線數, Celery 就是應用在這種非同步執行的情況下。

Celery 目前已經被許多公司使用(詳見 Companies/websites using Celery ),其中較知名的有 Instagram, Gandi, Red Hat, Uber 等等,所以算是蠻值得一學的 Python 模組。

本篇將學習如何使用 Celery 。

名詞介紹

在使用 Celery 前,先介紹幾個名詞,了解這些名詞有助於 Celery 的運作。

Celery Application

在使用 Celery 時,需要實例化(instantiated) Celery ,這個實例(instance)就被稱為 Application 或者 app ,因此可以看到 Celery 文件中通常會用 app 來命名。另外, Celery 是 thread-safe 的,所以可以多個 Celery Application 共存。

The Celery library must be instantiated before use, this instance is called an application (or app for short). The application is thread-safe so that multiple Celery applications with different configurations, components, and tasks can co-exist in the same process space.

實例化 Celery 的程式碼如下:

>>> from celery import Celery
>>> app = Celery()

Task

Task 是 Celery 執行的最小單位,如果要讓工作可以非同步進行,就得把這些工作切分成一個個 Task 。等於是定義好 Celery Application 提供哪些 Task 被非同步執行。

可以看到官方文件範例定義 Task 的程式碼如下:

>>> @app.task
... def add(x, y):
...     return x + y

Worker

有 Task 之後,就需要有 Process 負責執行 Task ,這樣才能夠達到非同步執行的目標,這時候就需要 Worker 在背景執行 Task 。所以 Celery 官方文件也提到了啟動 Worker 的指令:

$ celery -A tasks worker --loglevel=info

-A tasks 的意思是為名稱為 tasks 的 Celey Application 執行 Worker 。

Task Message & Broker

定義好 Task 與啟動 Worker 之後,要怎麼叫 Worker 執行 Task 呢?

這時候就需要靠 Task Message 告訴 Celery Application 要非同步執行某個 Task , Celery Application 會幫忙找到 Worker 執行 Task 。而 Task Message 本身不會夾帶任何程式碼,只會帶有要執行的 Task 名稱及參數等資訊(如果可以夾帶程式碼就可以執行任意程式碼,造成可怕的安全漏洞)。然而, Celery 並不負責 Task Message 的傳遞,這時候就需要 1 個稱為 Broker 的角色負責傳遞 Task Message ,也因此可以看到 Celery 官方文件提到需要 Broker 。

目前 Celery 支援的 Brokers:

  • RabbitMQ
  • Redis
  • Amazon SQS
  • Zookeeper

詳見 Broker Overview

Result Backend

最後 1 個角色是 Result Backend 。

如果要追蹤 Task 的執行的狀態(例如 PENDING, STARTED, SUCCESS, FAILURE, RETRY, REVOKED 等狀態,詳見 Built-in States ),就需要 Result Backend 儲存 Task 的狀態,這就是 Result Backend 的作用,所以也可以看到 Celery 文件中提到 Result Backend 的相關設定。

在系統架構不大的情況下,個人是蠻推薦用 redis 作為 Broker 及 Result Backend 的,因為 redis 同時也可作為 cache server 使用,可以讓系統架構單純許多,開發速度也會快一些,因此本文只會利用 redis 作為 Broker & Result Backend 。

以上就是使用 Celery 所必須認識的幾個名詞。

初試 Celery

環境要求

以下是本文的環境要求,安裝方式就不再贅述了。

  • redis 4.0
  • python 3.6

安裝 Celery

  • celery 4.2.1
  • celery[redis]
$ pip install celery
$ pip install "celery[redis]"

設定 Celery Application & 定義 1 個 Task

如同前述介紹,使用 Celery 時,需要先實例化 1 個 Celery Application 。此處示範以 redis 作為 Broker & Result Backend 並實例化 Celery Application 。

# -*- coding: utf-8 -*-
from celery import Celery


app = Celery(
    'tasks',
    broker='redis://127.0.0.1:6379/0',
    backend='redis://127.0.0.1:6379/1',
)


@app.task
def say_hello():
    print('Hello')

上述程式碼將 Celery Application 名稱設定為 tasks ,並將 broker, result backend 分別設定在 redis://127.0.0.1:6379/0redis://127.0.0.1:6379/1

接著用 @app.task decorator 定義 1 個 say_hello 的 task 。

Hello, Celery Worker

定義好 Celery Application 與 Task 之後,就需要 Worker 等著執行 Task ,這邊暫且不透過 celery 指令啟動 Worker ,而是透過添加以下 2 行程式碼,讓剛剛寫好的程式可以直接執行,變成 Worker 。

if __name__ == '__main__':
    app.worker_main()

所以完整的程式如下(檔名為 tasks.py ):

# -*- coding: utf-8 -*-
from celery import Celery


app = Celery(
    'tasks',
    broker='redis://127.0.0.1:6379/0',
    backend='redis://127.0.0.1:6379/1',
)


@app.task
def say_hello():
    print('Hello')


if __name__ == '__main__':
    app.worker_main()

接下來直接執行 tasks.py 就能夠看到 Worker 啟動的畫面了:

$ python tasks.py

...(略)...
...(略)...

[config]
.> app:         __main__:0x10335c828
.> transport:   redis://127.0.0.1:6379/0
.> results:     redis://127.0.0.1:6379/1
.> concurrency: 4 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)

[queues]
.> celery           exchange=celery(direct) key=celery

p.s. 執行成功之後先不要關掉 Worker ,下一步會用到

執行 Task 吧!

Worker 執行起來之後,接著要怎麼非同步執行 Task 呢?

先另開個 Terminal ,執行 Python 直譯器,我們在 Python 直譯器中試著非同步執行 say_hello 吧。

p.s. 請與 tasks.py 在同一層資料夾下執行以下指令

$ python
>>> from tasks import say_hello
>>> say_hello.delay()
<AsyncResult: e8af98f2-920c-43e2-a8e8-175bb8cd88cb>

上述從 tasks.py import say_hello task 之後,利用 celery 的 .delay 方法發出非同步執行的請求,然後得到 1 個 AsyncResult object 。

接著就會看到 Worker 印出 Hello 字串,執行結果如下:

[config]
.> app:         tasks:0x10b6706a0
.> transport:   redis://127.0.0.1:6379/0
.> results:     redis://127.0.0.1:6379/1
.> concurrency: 4 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)

[queues]
.> celery           exchange=celery(direct) key=celery


[2018-07-23 23:54:44,137: WARNING/ForkPoolWorker-2] Hello

以上就是 Celery 非同步執行的初體驗,下一篇我們將深入了解更多 Celery 相關的知識。

Python Celery 教學 (2) - AsyncResult 與執行工作

Referencs

http://www.celeryproject.org/

對抗久坐職業傷害

研究指出每天增加 2 小時坐著的時間,會增加大腸癌、心臟疾病、肺癌的風險,也造成肩頸、腰背疼痛等常見問題。

然而對抗這些問題,卻只需要工作時定期休息跟伸展身體即可!

你想輕鬆改變現狀嗎?試試看我們的 PomodoRoll 番茄鐘吧! PomodoRoll 番茄鐘會根據你所設定的專注時間,定期建議你 1 項辦公族適用的伸展運動,幫助你打敗久坐所帶來的傷害!

贊助我們的創作

看完這篇文章了嗎? 休息一下,喝杯咖啡吧!

如果你覺得 MyApollo 有讓你獲得實用的資訊,希望能看到更多的技術分享,邀請你贊助我們一杯咖啡,讓我們有更多的動力與精力繼續提供高品質的文章,感謝你的支持!