Python Celery 教學 (4) - Queues
Last updated on Nov 22, 2023 in Python 模組/套件推薦 , Python 程式設計 - 高階 by Amo Chen ‐ 2 min read
接著談談 Celery 的重點 - Queues!
先前的範例都是直接執行 tasks.py 啟動 Worker ,接下來利用 celery
指令啟動 Worker 試試:
$ celery -A tasks.app worker -l info
順利啟動之後會看到以下畫面:
[config]
.> app: tasks:0x1020da748
.> transport: redis://127.0.0.1:6379/0
.> results: redis://127.0.0.1:6379/1
.> concurrency: 4 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)
[queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. tasks.add
. tasks.mul
. tasks.resize_photo
. tasks.say
. tasks.say_hello
[2018-08-24 11:04:56,245: INFO/MainProcess] Connected to redis://127.0.0.1:6379/0
[2018-08-24 11:04:56,260: INFO/MainProcess] mingle: searching for neighbors
[2018-08-24 11:04:57,280: INFO/MainProcess] mingle: all alone
[2018-08-24 11:04:57,303: INFO/MainProcess] celery@localhost ready.
上述的畫面可以看到 Worker 啟動時的資訊,其中有一個區塊有特別標示出 queues:
[queues]
.> celery exchange=celery(direct) key=celery
事實上,每一個 Worker 都可以指定要消耗的工作佇列(Queues),因此可以看到啟動 Worker 時有個參數 -Q
可以指定工作佇列。
例如啟動指定只消耗名稱為 foo
bar
baz
的 Worker:
$ celery -A proj worker -l info -Q foo,bar,baz
所以如果要讓 Worker 指定多個要消耗的工作佇列可以用:
$ celery -A tasks.app worker -l info -Q celery,celery2
接著就會看到 Worker 啟動畫面的 Queues 變成 2 個工作佇列:
[queues]
.> celery exchange=celery(direct) key=celery
.> celery2 exchange=celery2(direct) key=celery2```
有多個工作佇列之後,要如何任意把工作送到不同的工作佇列呢?
答案就在 [Routing Tasks](http://docs.celeryproject.org/en/latest/userguide/routing.html)
首先談到 1 個 Queue 會有以下 3 種設定( `exchange` , `exchange_type` , `routing_key` ):
{ ’exchange’: ‘celery’, ’exchange_type’: ‘direct’, ‘routing_key’: ‘celery’ }
這是由於 AMQP(Advanced Message Queuing Protocol) 協定的規範,所以定義一個 Queue 會有這 3 種設定,詳情可以看 RabbitMQ 的 [說明文件](https://www.rabbitmq.com/tutorials/amqp-concepts.html) ,簡單來說就是定義一個 task 要如何被分配到不同工作佇列的方式。而由於有些 backend 本來就不是為了 AMQP 而打造的(例如 Redis ) ,所以不支援 `exchange` ,因此 Celery 直接把 `exchange` 指定為與 Queue 名稱相同的值,確保相同的設定都能夠運作。
> The non-AMQP backends like Redis or SQS don’t support exchanges, so they require the exchange to have the same name as the queue. Using this design ensures it will work for them as well.
所以回到剛剛啟動 Worker 畫面中的 Queues 區塊,就可以發現工作佇列名稱後的 exchange 與 key 字樣其實就是對應到 `exchange` 與 `routing_key` 的設定,而括號內的 `direct` 則是對應到 `exchange_type` :
[queues] .> celery exchange=celery(direct) key=celery .> celery2 exchange=celery2(direct) key=celery2```
有這些資訊之後,我們就能夠在 apply_async
指定 queue
與 routing_key
,將 task 派送至不同的 queue 中 :
>>> from tasks import add
>>> add.apply_async(args=(1, 2), queue='celery2', routing_key='celery2')
除了 apply_async
之外,不同的 tasks 也可以事先設定要在哪個 queue 中執行,只要設定 app.conf.task_routes
即可:
task_routes = ([
('tasks.add', {'queue': 'celery'}),
('tasks.resize_photo', {'queue': 'celery2'}),
],)
app.conf. task_routes = task_routes
設定成功之後,只要呼叫相對應的 task 就會被自動派往設定好的 queue 中。
以上為基本 Celery Queue 的觀念。
因為本文力求簡單故以 redis 作為 broker ,如果使用是 RabbitMQ 等支援 AMQP 作為 broker 者,推薦進一步閱讀官方文件 Special Routing Options 。
References
http://docs.celeryproject.org/en/latest/userguide/routing.html