Python Celery 教學 (1) - 介紹與初次使用
Last updated on Nov 22, 2023 in Python 模組/套件推薦 , Python 程式設計 - 高階 by Amo Chen ‐ 4 min read
Celery 是一套頗成熟的 Distributed Task Queue
解決方案,讓我們可以非同步(asynchronous)執行任務(tasks) / 工作(jobs) ,這種非同步的作法很常見於 Web Application 。舉個例子說明適合使用 Celery 的情況也許會更清楚,假設有一使用者需要透過 Web Application 匯出一份龐大的資料(可能執行時間很長,超過 30 分鐘),在這情況之下,我們也無法要求使用者一直開著該網頁不關,這時候比較好的作法就是利用非同步的方式執行匯出資料的工作,把工作移到背景執行,然後告知使用者工作執行完畢後會透過 Email / 即時通訊軟體通知工作完成,讓使用者可以回來下載資料,如此一來,使用者就不需要一直開著網頁佔用伺服器的連線數, Celery 就是應用在這種非同步執行的情況下。
Celery 目前已經被許多公司使用(詳見 Companies/websites using Celery ),其中較知名的有 Instagram, Gandi, Red Hat, Uber 等等,所以算是蠻值得一學的 Python 模組。
本篇將學習如何使用 Celery 。
名詞介紹
在使用 Celery 前,先介紹幾個名詞,了解這些名詞有助於 Celery 的運作。
Celery Application
在使用 Celery 時,需要實例化(instantiated) Celery ,這個實例(instance)就被稱為 Application 或者 app
,因此可以看到 Celery 文件中通常會用 app
來命名。另外, Celery 是 thread-safe 的,所以可以多個 Celery Application 共存。
The Celery library must be instantiated before use, this instance is called an application (or app for short). The application is thread-safe so that multiple Celery applications with different configurations, components, and tasks can co-exist in the same process space.
實例化 Celery 的程式碼如下:
>>> from celery import Celery
>>> app = Celery()
Task
Task 是 Celery 執行的最小單位,如果要讓工作可以非同步進行,就得把這些工作切分成一個個 Task 。等於是定義好 Celery Application 提供哪些 Task 被非同步執行。
可以看到官方文件範例定義 Task 的程式碼如下:
>>> @app.task
... def add(x, y):
... return x + y
Worker
有 Task 之後,就需要有 Process 負責執行 Task ,這樣才能夠達到非同步執行的目標,這時候就需要 Worker 在背景執行 Task 。所以 Celery 官方文件也提到了啟動 Worker 的指令:
$ celery -A tasks worker --loglevel=info
-A tasks
的意思是為名稱為 tasks
的 Celey Application 執行 Worker 。
Task Message & Broker
定義好 Task 與啟動 Worker 之後,要怎麼叫 Worker 執行 Task 呢?
這時候就需要靠 Task Message 告訴 Celery Application 要非同步執行某個 Task , Celery Application 會幫忙找到 Worker 執行 Task 。而 Task Message 本身不會夾帶任何程式碼,只會帶有要執行的 Task 名稱及參數等資訊(如果可以夾帶程式碼就可以執行任意程式碼,造成可怕的安全漏洞)。然而, Celery 並不負責 Task Message 的傳遞,這時候就需要 1 個稱為 Broker 的角色負責傳遞 Task Message ,也因此可以看到 Celery 官方文件提到需要 Broker 。
目前 Celery 支援的 Brokers:
- RabbitMQ
- Redis
- Amazon SQS
- Zookeeper
Result Backend
最後 1 個角色是 Result Backend 。
如果要追蹤 Task 的執行的狀態(例如 PENDING, STARTED, SUCCESS, FAILURE, RETRY, REVOKED 等狀態,詳見 Built-in States ),就需要 Result Backend 儲存 Task 的狀態,這就是 Result Backend 的作用,所以也可以看到 Celery 文件中提到 Result Backend 的相關設定。
在系統架構不大的情況下,個人是蠻推薦用 redis 作為 Broker 及 Result Backend 的,因為 redis 同時也可作為 cache server 使用,可以讓系統架構單純許多,開發速度也會快一些,因此本文只會利用 redis 作為 Broker & Result Backend 。
以上就是使用 Celery 所必須認識的幾個名詞。
初試 Celery
環境要求
以下是本文的環境要求,安裝方式就不再贅述了。
- redis 4.0
- python 3.6
安裝 Celery
- celery 4.2.1
- celery[redis]
$ pip install celery
$ pip install "celery[redis]"
設定 Celery Application & 定義 1 個 Task
如同前述介紹,使用 Celery 時,需要先實例化 1 個 Celery Application 。此處示範以 redis 作為 Broker & Result Backend 並實例化 Celery Application 。
# -*- coding: utf-8 -*-
from celery import Celery
app = Celery(
'tasks',
broker='redis://127.0.0.1:6379/0',
backend='redis://127.0.0.1:6379/1',
)
@app.task
def say_hello():
print('Hello')
上述程式碼將 Celery Application 名稱設定為 tasks
,並將 broker, result backend 分別設定在 redis://127.0.0.1:6379/0
與 redis://127.0.0.1:6379/1
。
接著用 @app.task
decorator 定義 1 個 say_hello 的 task 。
Hello, Celery Worker
定義好 Celery Application 與 Task 之後,就需要 Worker 等著執行 Task ,這邊暫且不透過 celery
指令啟動 Worker ,而是透過添加以下 2 行程式碼,讓剛剛寫好的程式可以直接執行,變成 Worker 。
if __name__ == '__main__':
app.worker_main()
所以完整的程式如下(檔名為 tasks.py
):
# -*- coding: utf-8 -*-
from celery import Celery
app = Celery(
'tasks',
broker='redis://127.0.0.1:6379/0',
backend='redis://127.0.0.1:6379/1',
)
@app.task
def say_hello():
print('Hello')
if __name__ == '__main__':
app.worker_main()
接下來直接執行 tasks.py
就能夠看到 Worker 啟動的畫面了:
$ python tasks.py
...(略)...
...(略)...
[config]
.> app: __main__:0x10335c828
.> transport: redis://127.0.0.1:6379/0
.> results: redis://127.0.0.1:6379/1
.> concurrency: 4 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)
[queues]
.> celery exchange=celery(direct) key=celery
p.s. 執行成功之後先不要關掉 Worker ,下一步會用到
執行 Task 吧!
Worker 執行起來之後,接著要怎麼非同步執行 Task 呢?
先另開個 Terminal ,執行 Python 直譯器,我們在 Python 直譯器中試著非同步執行 say_hello
吧。
p.s. 請與 tasks.py
在同一層資料夾下執行以下指令
$ python
>>> from tasks import say_hello
>>> say_hello.delay()
<AsyncResult: e8af98f2-920c-43e2-a8e8-175bb8cd88cb>
上述從 tasks.py
import say_hello task 之後,利用 celery 的 .delay
方法發出非同步執行的請求,然後得到 1 個 AsyncResult object 。
接著就會看到 Worker 印出 Hello
字串,執行結果如下:
[config]
.> app: tasks:0x10b6706a0
.> transport: redis://127.0.0.1:6379/0
.> results: redis://127.0.0.1:6379/1
.> concurrency: 4 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)
[queues]
.> celery exchange=celery(direct) key=celery
[2018-07-23 23:54:44,137: WARNING/ForkPoolWorker-2] Hello
以上就是 Celery 非同步執行的初體驗,下一篇我們將深入了解更多 Celery 相關的知識。
Python Celery 教學 (2) - AsyncResult 與執行工作
Referencs
http://www.celeryproject.org/