Flask 整合 Celery

Posted on  Jun 1, 2019  in  Flask  by  Amo Chen  ‐ 2 min read

提到 Python 非同步(asynchronous)工作通常都會想到 Celery ,而 Web-based 應用程式也經常會將耗時的要求利用非同步的方式完成,所以 Django, Flask 等 Web framework 也幾乎都會整合 Celery 。

其中 Flask 的 Celery 相關套件相對鮮少維護,因此不建議利用 Flask Celery 套件整合 Celery 。

所幸用 Flask 整合 Celery 並非難事,本文將紀錄如何不利用 Flask 套件的情況下整合 Celery 。

本文環境

  • Python 3.6.5
  • Flask 1.0.2
  • celery 4.2.1
  • redis 3.2.0
$ pip install celery redis

本文同時使用 Redis DB 作為 celery 的 broker 與 result backend ,請確保 Redis DB 在執行環境中正確運行。

此外,如從未使用過 Celery ,可以閱讀 此篇教學

專案結構

以下為本文 Flask 專案結構:

.
├── app.py
└── tasks.py

app.py 內容:

from flask import Flask


def create_app():
    flask_app = Flask(__name__)
    return flask_app


app = create_app()


@app.route('/', methods=['GET'])
def index():
    from tasks import add
    r = add.delay(1, 2)
    r.wait()
    return f'1 + 2 = {r.result}'

上述程式主要是在 index() 中呼叫一個 celery task - add ,接著等待 add 非同步執行完成後並顯示在頁面上。

tasks.py 內容:

from celery import Celery
from app import create_app


REDIS = 'redis://127.0.0.1:4307/0'


def make_celery(app):
    app.config['CELERY_ACCEPT_CONTENT'] = ['json']
    app.config['CELERY_TASK_SERIALIZER'] = 'json'
    app.config['CELERY_RESULT_SERIALIZER'] = 'json'
    app.config['CELERY_RESULT_BACKEND'] = REDIS
    app.config['CELERY_BROKER_URL'] = REDIS

    _celery = Celery(
        app.import_name,
        backend=REDIS,
        broker=REDIS,
    )
    _celery.conf.update(app.config)

    class ContextTask(_celery.Task):
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)

    _celery.Task = ContextTask
    return _celery


celery = make_celery(create_app())


@celery.task()
def add(a, b):
    return a + b

tasks.py 主要利用 make_celery 函式建立 Celery 物件,設定 Celery worker 只接受 JSON 資料格式的傳遞,並且將 broker 與 result backend 設定為 Redis DB, 再為 Celery task 加上 Flask 的 app_context ,讓 Celery 能夠取得 Flask application-level 的資料,例如 Flask 的 config / logger 等。

最後定義 1 個 celery task - add, 供人呼叫使用。

執行 Flask 與 Celery worker

用以下指令同時執行 Flask 與 Celery worker 試試:

$ FLASK_ENV=app.py flask run & celery -A tasks worker --concurrency=3 --loglevel=DEBUG 

成功之後,打開 http://127.0.0.1:5000/ 應該就能夠看到執行畫面囉!

Happy Coding!

References

http://flask.pocoo.org/docs/1.0/patterns/celery/

FOLLOW US

對抗久坐職業傷害

研究指出每天增加 2 小時坐著的時間,會增加大腸癌、心臟疾病、肺癌的風險,也造成肩頸、腰背疼痛等常見問題。

然而對抗這些問題,卻只需要工作時定期休息跟伸展身體即可!

你想輕鬆改變現狀嗎?試試看我們的 PomodoRoll 番茄鐘吧! PomodoRoll 番茄鐘會根據你所設定的專注時間,定期建議你 1 項辦公族適用的伸展運動,幫助你打敗久坐所帶來的傷害!

贊助我們的創作

看完這篇文章了嗎? 休息一下,喝杯咖啡吧!

如果你覺得 MyApollo 有讓你獲得實用的資訊,希望能看到更多的技術分享,邀請你贊助我們一杯咖啡,讓我們有更多的動力與精力繼續提供高品質的文章,感謝你的支持!