Python Celery 教學 (4) - Queues

Last updated on  Nov 22, 2023  in  Python 模組/套件推薦 , Python 程式設計 - 高階  by  Amo Chen  ‐ 2 min read

接著談談 Celery 的重點 - Queues!

先前的範例都是直接執行 tasks.py 啟動 Worker ,接下來利用 celery 指令啟動 Worker 試試:

$ celery -A tasks.app worker -l info

順利啟動之後會看到以下畫面:

[config]
.> app:         tasks:0x1020da748
.> transport:   redis://127.0.0.1:6379/0
.> results:     redis://127.0.0.1:6379/1
.> concurrency: 4 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)

[queues]
.> celery           exchange=celery(direct) key=celery


[tasks]
  . tasks.add
  . tasks.mul
  . tasks.resize_photo
  . tasks.say
  . tasks.say_hello

[2018-08-24 11:04:56,245: INFO/MainProcess] Connected to redis://127.0.0.1:6379/0
[2018-08-24 11:04:56,260: INFO/MainProcess] mingle: searching for neighbors
[2018-08-24 11:04:57,280: INFO/MainProcess] mingle: all alone
[2018-08-24 11:04:57,303: INFO/MainProcess] celery@localhost ready.

上述的畫面可以看到 Worker 啟動時的資訊,其中有一個區塊有特別標示出 queues:

[queues]
.> celery           exchange=celery(direct) key=celery

事實上,每一個 Worker 都可以指定要消耗的工作佇列(Queues),因此可以看到啟動 Worker 時有個參數 -Q 可以指定工作佇列。

例如啟動指定只消耗名稱為 foo bar baz 的 Worker:

$ celery -A proj worker -l info -Q foo,bar,baz

所以如果要讓 Worker 指定多個要消耗的工作佇列可以用:

$ celery -A tasks.app worker -l info -Q celery,celery2

接著就會看到 Worker 啟動畫面的 Queues 變成 2 個工作佇列:

[queues]
.> celery           exchange=celery(direct) key=celery
.> celery2          exchange=celery2(direct) key=celery2```

有多個工作佇列之後,要如何任意把工作送到不同的工作佇列呢?

答案就在 [Routing Tasks](http://docs.celeryproject.org/en/latest/userguide/routing.html)

首先談到 1 個 Queue 會有以下 3 種設定( `exchange` , `exchange_type` , `routing_key` ):

{ ’exchange’: ‘celery’, ’exchange_type’: ‘direct’, ‘routing_key’: ‘celery’ }


這是由於 AMQP(Advanced Message Queuing Protocol) 協定的規範,所以定義一個 Queue 會有這 3 種設定,詳情可以看 RabbitMQ 的 [說明文件](https://www.rabbitmq.com/tutorials/amqp-concepts.html) ,簡單來說就是定義一個 task 要如何被分配到不同工作佇列的方式。而由於有些 backend 本來就不是為了 AMQP 而打造的(例如 Redis ) ,所以不支援 `exchange` ,因此 Celery 直接把 `exchange` 指定為與 Queue 名稱相同的值,確保相同的設定都能夠運作。

> The non-AMQP backends like Redis or SQS don’t support exchanges, so they require the exchange to have the same name as the queue. Using this design ensures it will work for them as well.

所以回到剛剛啟動 Worker 畫面中的 Queues 區塊,就可以發現工作佇列名稱後的 exchange 與 key 字樣其實就是對應到 `exchange` 與 `routing_key` 的設定,而括號內的 `direct` 則是對應到 `exchange_type` :

[queues] .> celery exchange=celery(direct) key=celery .> celery2 exchange=celery2(direct) key=celery2```

有這些資訊之後,我們就能夠在 apply_async 指定 queuerouting_key ,將 task 派送至不同的 queue 中 :

>>> from tasks import add
>>> add.apply_async(args=(1, 2), queue='celery2', routing_key='celery2')

除了 apply_async 之外,不同的 tasks 也可以事先設定要在哪個 queue 中執行,只要設定 app.conf.task_routes 即可:

task_routes = ([
    ('tasks.add', {'queue': 'celery'}),
    ('tasks.resize_photo', {'queue': 'celery2'}),
],)
app.conf. task_routes = task_routes

設定成功之後,只要呼叫相對應的 task 就會被自動派往設定好的 queue 中。

以上為基本 Celery Queue 的觀念。

因為本文力求簡單故以 redis 作為 broker ,如果使用是 RabbitMQ 等支援 AMQP 作為 broker 者,推薦進一步閱讀官方文件 Special Routing Options

References

http://docs.celeryproject.org/en/latest/userguide/routing.html

對抗久坐職業傷害

研究指出每天增加 2 小時坐著的時間,會增加大腸癌、心臟疾病、肺癌的風險,也造成肩頸、腰背疼痛等常見問題。

然而對抗這些問題,卻只需要工作時定期休息跟伸展身體即可!

你想輕鬆改變現狀嗎?試試看我們的 PomodoRoll 番茄鐘吧! PomodoRoll 番茄鐘會根據你所設定的專注時間,定期建議你 1 項辦公族適用的伸展運動,幫助你打敗久坐所帶來的傷害!

贊助我們的創作

看完這篇文章了嗎? 休息一下,喝杯咖啡吧!

如果你覺得 MyApollo 有讓你獲得實用的資訊,希望能看到更多的技術分享,邀請你贊助我們一杯咖啡,讓我們有更多的動力與精力繼續提供高品質的文章,感謝你的支持!