用 Python 學 Server-Sent Events (SSE) — 以 Flask 實作為例
Posted on May 21, 2024 in Python 程式設計 - 中階 by Amo Chen ‐ 5 min read
你有想過 sever 端要怎麼主動通知 client 端有事件發生或資料更新嗎?
除了 WebSocket 還有沒有其他做法呢?
一起看看 1 種稱為 Server-Sent Events 的技術吧!
本文環境
- Python 3
- Flask
SSE(Server-Sent Events) 簡介
以往談到 Web 即時(realtime)雙向通訊手段,多數人直覺會想到 WebSocket , 藉著 WebSocket 能夠做到即時聊天、遊戲等應用。
但絕大多數 Web 服務並不需要用到 WebSocket 這種即時雙向通訊的技術,反而只需要被動接受來自 server 端的通知,即 server 告知 client 端某項任務/要求已完成,例如線上候位、候診等服務,只需要通知 client 端現在進度到幾號。
類似這種查詢現在工作狀態或進度的服務,一般可以使用輪詢(polling)的方式達成,例如定期每秒呼叫特定 API 取得工作狀態或進度,即可達成類似即時更新的效果。
但是輪詢的缺點是 client 端並不知道 server 端何時更新資料,所以必須持續送出要求(request)才能取得最新資料,甚至在 server 端還沒有更新資料的情況下,也需要一直送出要求,如此一來將造成浪費 server 端運算資源、網路資源的情況。
Server-Sent Events(SSE) 的出現,使得 server 具有主動發送資料給 client 端的能力,改善以往網頁需要使用輪詢等技術才能取得最新資料的情況。
Client 端可以藉由建立 EventSource 與 server 建立長期連線,就像訂閱訊息一樣,當 server 端有任何事件/變更發生時,就透過該連線主動通知 client 端, client 端就能收到 Event 並觸發後續流程。
值得注意的是 SSE 並不像 WebSocket 那樣能夠做到雙向即時溝通,只能做到單向的通知,即 server 端到 client 端,所以特別適合應用在線上候位、候診等單向通知的情境。
SSE 詳細的介紹影片可以看此影片 Don’t Use Websockets (Until You Try This…) 。
p.s. 其實可以把 SSE 視為 1 種串流(stream), 可以把資料不斷地傳給 client 端
用 Flask 做 1 個簡單的 SSE server
SSE 實際上是基於 HTTP 協定的技術,運作上很像是使用者呼叫 1 個 API 之後,該 API 會不斷地回應資料,與我們一般認知 API 只會回應 1 次不同,後端運作上很像以下 Python 程式碼:
@app.route('/stream')
def stream():
while True:
data = get_data()
send_response(content_type='text/event-stream', data=data)
當 client 端呼叫 /stream
API 時,就會持續收到 Content-Type: text/event-stream
的資料。
p.s. Content-Type: text/event-stream
是 SSE 的規定,這代表我們所回應的資料是純文字,所以想傳送 JSON 等格式的資料,都是轉成 string 之後,到 client 端自行處理
另外,每 1 個 event 主要分為 3 個欄位:
- Id 代表 event Id
- Event 代表 event type
- Data 代表傳送的資料
Sever 端傳送資料時可以按照這 3 個欄位的格式填充資料,需以 \n\n
做為結尾,並使用 UTF-8 編碼,詳見 Interpreting an event stream:
id: [string] event: [string] data: [string]\n\n
p.s. 這 3 個欄位都不是必填,所以可以只傳空行觸發 event
知道 SSE 的樣貌之後,我們可以實際使用 Flask 實作 SSE server:
import time
import json
from flask import Flask, Response
app = Flask(__name__)
@app.route('/stream')
def stream():
def iter_data():
while True:
yield 'data:' + json.dumps({'time': time.strftime('%Y-%m-%d %H:%M:%S')}) + '\n\n'
time.sleep(3)
return iter_data(), {'Content-Type': 'text/event-stream'}
if __name__ == '__main__':
app.run(debug=True)
Flask 也支援以串流(stream)方式傳送資料,最簡單的方式是 API 回傳 1 個 generator 即可,所以上述程式碼的 stream()
API 回傳 1 個 iter_data()
generator, 該 generator 會在無窮迴圈中 yield
資料,同時為了符合 SSE 規定,額外以 1 個 dictionary 指定 Content-Type
為 text/event-stream
。
p.s. 如果 SSE server 前面還有擋 1 層 nginx 的話,可以多設定 header X-Accel-Buffering: no
, 避免 nginx 把 SSE server 的回應存到 buffer, 造成 SSE 運作不正常
p.s. 如果你喜歡使用 FastAPI 可以參考 StreamingResponse 章節
這就是 SSE 在後端的樣貌,無論用什麼框架(framework)或者語言都是類似的作法。
接下來,試試看用 curl
指令測試是否會持續收到來自 server 的資料,指令如下:
$ curl http://127.0.0.1:5000/stream
運作正常的話,就可以看到一直收到類似以下的字串:
data:{"time": "2024-05-21 14:55:42"}
data:{"time": "2024-05-21 14:55:45"}
data:{"time": "2024-05-21 14:55:48"}
那前端呢?
誠如前文所述,前端只要建立 1 個 EventSource 就可以與 server 端建立連線。
前端最簡單的 HTML & JavaScript 如下所示:
index.html
<!DOCTYPE html>
<html>
<head>
<title>SSE Example</title>
</head>
<body>
<h1>Server-Sent Events</h1>
<div id="events"></div>
<script type="text/javascript">
const eventSource = new EventSource('/stream');
eventSource.onmessage = function(event) {
const eventsDiv = document.getElementById('events');
eventsDiv.innerHTML += `<p>${event.data}</p>`;
};
</script>
</body>
</html>
與 Flask 整合之後, Python 程式碼如下:
flask_sse_demo.py
import json
import time
from flask import Flask, Response, render_template
app = Flask(__name__)
@app.route('/stream')
def stream():
def iter_data():
while True:
yield 'data:' + json.dumps({'time': time.strftime('%Y-%m-%d %H:%M:%S')}) + '\n\n'
time.sleep(3)
return iter_data(), {'Content-Type': 'text/event-stream'}
@app.route('/')
def index():
return render_template('index.html')
if __name__ == '__main__':
app.run(debug=True)
上述 'index.html'
需要放在 templetes
資料夾底下,專案結構為:
.
├── templates
│ └── index.html
└── flask_sse_demo.py
可以使用以下指令啟動 SSE server:
$ python flask_sse_demo.py
啟動之後,可以用瀏覽器打開網址 http://127.0.0.1:5000
, 就會看到 SSE 開始運作:
也可以在開發者工具看到 SSE 傳輸資料的資訊:
如果想進一步將 JSON 資料解開,則只需要在 client 端使用 JSON.parse() 即可。
要怎麼在 while 迴圈裡面接收 event?
相信文到此處,大家都知道 SSE 實際怎麼運作了。
不過應該有很多人會跟我一樣好奇,既然 SSE 可能是以 while
或 for
迴圈實作(或無窮迴圈),那要怎麼針對個別使用者發送專屬的資料?
在此提供 1 個比較簡單的做法:
使用 Redis, Kafka, RabbitMQ 等訊息訂閱/發布服務(pub-sub)
直接看以下程式碼可能會更容易理解,以下程式碼使用 Redis pub-sub 功能,訂閱事件以觸發 SSE:
r = redis.Redis(host='localhost', port=6379, db=0)
@app.route('/stream')
def stream():
def push_events():
sub = r.pubsub()
sub.subscribe('the_channel_you_want')
while True:
message = sub.get_message()
if message:
# do something with the message
data = ...
yield f'data:{data}\n\n'
time.sleep(1)
return push_events(), {'Content-Type': 'text/event-stream'}
上述程式碼中的 the_channel_you_want
就是後端接收事件的頻道(channel),我們可以藉由發布訊息到該頻道觸發 SSE (設計上可以不同使用者訂閱不同頻道,或者訂閱相同頻道,端看需求為何)。
p.s. 本範例僅供教學參考,實際上設計也要注意每個訊息訂閱/發布服務都有連線數上限,過多的 clients 也會達到連線數上限
Server-Sent Events 的問題
如前文所述,建立 EventSource 之後瀏覽器會與 server 建立長期連線,如果 Sever 端是採用多執行緒架構,就會有 1 個執行緒被佔用;如果 Server 端是採用多 processes 架構,就會有 1 個 process 被佔用。
佔用的情況會導致 server 一直有運算資源無法釋放,最終造成無法服務其他 clients 。
如果想模擬這情況,可以把前述範例的 app.run()
改為下列程式碼,模擬僅有 1 個 process 單執行緒的情況:
app.run(debug=True, processes=1, threaded=False)
改成上述形式之後,可以使用 2 個終端機各自輸入 curl http://127.0.0.1:5000/stream
,就會發現其中 1 個指令始終無法收到資料,只有當另 1 個指令結束之後,才能收到資料,這就是 process 被佔用導致無法服務其他 clients 的情況,這個情況也間接揭露 SSE 在面對巨量連線時可能會產生效能瓶頸的缺點。
所以後端架構如果要加上 SSE 功能,最好與一般的 API server 分開,避免 SSE 佔用運算資源的情況發生,也避免影響一般的 API 服務。
總結
SSE 是 1 個介於 polling 與 WebSocket 的技術,是相對 WebSocket 輕量的技術,允許伺服器單向持續地向客戶端發送資料,而不需要客戶端持續發送請求詢問,這對於需要即時更新的應用場景非常有用,例如股票行情、聊天應用、即時通知等。
不過 SSE 佔用連線、運算資源的特性,也使得它在多連線數的情況下可能會產生效能瓶頸,因此實作 SSE 時須多加考慮過多連線的情況下應該如何處理。
以上!
Enjoy!
References
HTML Standard - Server-Sent Events
Streaming Contents — Flask Documentation (2.3.x)