提到 Python 非同步(asynchronous)工作通常都會想到 Celery ,而 Web-based 應用程式也經常會將耗時的要求利用非同步的方式完成,所以 Django, Flask 等 Web framework 也幾乎都會整合 Celery 。
其中 Flask 的 Celery 相關套件相對鮮少維護,因此不建議利用 Flask Celery 套件整合 Celery 。
所幸用 Flask 整合 Celery 並非難事,本文將紀錄如何不利用 Flask 套件的情況下整合 Celery 。
本文環境
- Python 3.6.5
- Flask 1.0.2
- celery 4.2.1
- redis 3.2.0
$ pip install celery redis
本文同時使用 Redis DB 作為 celery 的 broker 與 result backend ,請確保 Redis DB 在執行環境中正確運行。
此外,如從未使用過 Celery ,可以閱讀 此篇教學 。
專案結構
以下為本文 Flask 專案結構:
.
├── app.py
└── tasks.py
app.py
內容:
from flask import Flask
def create_app():
flask_app = Flask(__name__)
return flask_app
app = create_app()
@app.route('/', methods=['GET'])
def index():
from tasks import add
r = add.delay(1, 2)
r.wait()
return f'1 + 2 = {r.result}'
上述程式主要是在 index()
中呼叫一個 celery task - add ,接著等待 add 非同步執行完成後並顯示在頁面上。
tasks.py
內容:
from celery import Celery
from app import create_app
REDIS = 'redis://127.0.0.1:4307/0'
def make_celery(app):
app.config['CELERY_ACCEPT_CONTENT'] = ['json']
app.config['CELERY_TASK_SERIALIZER'] = 'json'
app.config['CELERY_RESULT_SERIALIZER'] = 'json'
app.config['CELERY_RESULT_BACKEND'] = REDIS
app.config['CELERY_BROKER_URL'] = REDIS
_celery = Celery(
app.import_name,
backend=REDIS,
broker=REDIS,
)
_celery.conf.update(app.config)
class ContextTask(_celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
_celery.Task = ContextTask
return _celery
celery = make_celery(create_app())
@celery.task()
def add(a, b):
return a + b
tasks.py
主要利用 make_celery 函式建立 Celery 物件,設定 Celery worker 只接受 JSON 資料格式的傳遞,並且將 broker 與 result backend 設定為 Redis DB, 再為 Celery task 加上 Flask 的 app_context ,讓 Celery 能夠取得 Flask application-level 的資料,例如 Flask 的 config / logger 等。
最後定義 1 個 celery task - add, 供人呼叫使用。
執行 Flask 與 Celery worker
用以下指令同時執行 Flask 與 Celery worker 試試:
$ FLASK_ENV=app.py flask run & celery -A tasks worker --concurrency=3 --loglevel=DEBUG
成功之後,打開 http://127.0.0.1:5000/ 應該就能夠看到執行畫面囉!
Happy Coding!
References
http://flask.pocoo.org/docs/1.0/patterns/celery/