用 Python 學 Server-Sent Events (SSE) — 以 Flask 實作為例

Posted on  May 21, 2024  in  Python 程式設計 - 中階  by  Amo Chen  ‐ 5 min read

你有想過 sever 端要怎麼主動通知 client 端有事件發生或資料更新嗎?

除了 WebSocket 還有沒有其他做法呢?

一起看看 1 種稱為 Server-Sent Events 的技術吧!

本文環境

SSE(Server-Sent Events) 簡介

以往談到 Web 即時(realtime)雙向通訊手段,多數人直覺會想到 WebSocket , 藉著 WebSocket 能夠做到即時聊天、遊戲等應用。

但絕大多數 Web 服務並不需要用到 WebSocket 這種即時雙向通訊的技術,反而只需要被動接受來自 server 端的通知,即 server 告知 client 端某項任務/要求已完成,例如線上候位、候診等服務,只需要通知 client 端現在進度到幾號。

類似這種查詢現在工作狀態或進度的服務,一般可以使用輪詢(polling)的方式達成,例如定期每秒呼叫特定 API 取得工作狀態或進度,即可達成類似即時更新的效果。

但是輪詢的缺點是 client 端並不知道 server 端何時更新資料,所以必須持續送出要求(request)才能取得最新資料,甚至在 server 端還沒有更新資料的情況下,也需要一直送出要求,如此一來將造成浪費 server 端運算資源、網路資源的情況。

Server-Sent Events(SSE) 的出現,使得 server 具有主動發送資料給 client 端的能力,改善以往網頁需要使用輪詢等技術才能取得最新資料的情況。

Client 端可以藉由建立 EventSource 與 server 建立長期連線,就像訂閱訊息一樣,當 server 端有任何事件/變更發生時,就透過該連線主動通知 client 端, client 端就能收到 Event 並觸發後續流程。

值得注意的是 SSE 並不像 WebSocket 那樣能夠做到雙向即時溝通,只能做到單向的通知,即 server 端到 client 端,所以特別適合應用在線上候位、候診等單向通知的情境。

SSE 詳細的介紹影片可以看此影片 Don’t Use Websockets (Until You Try This…)

p.s. 其實可以把 SSE 視為 1 種串流(stream), 可以把資料不斷地傳給 client 端

用 Flask 做 1 個簡單的 SSE server

SSE 實際上是基於 HTTP 協定的技術,運作上很像是使用者呼叫 1 個 API 之後,該 API 會不斷地回應資料,與我們一般認知 API 只會回應 1 次不同,後端運作上很像以下 Python 程式碼:

@app.route('/stream')
def stream():
    while True:
        data = get_data()
        send_response(content_type='text/event-stream', data=data)

當 client 端呼叫 /stream API 時,就會持續收到 Content-Type: text/event-stream 的資料。

p.s. Content-Type: text/event-stream 是 SSE 的規定,這代表我們所回應的資料是純文字,所以想傳送 JSON 等格式的資料,都是轉成 string 之後,到 client 端自行處理

另外,每 1 個 event 主要分為 3 個欄位:

  1. Id 代表 event Id
  2. Event 代表 event type
  3. Data 代表傳送的資料

Sever 端傳送資料時可以按照這 3 個欄位的格式填充資料,需以 \n\n 做為結尾,並使用 UTF-8 編碼,詳見 Interpreting an event stream

id: [string] event: [string] data: [string]\n\n

p.s. 這 3 個欄位都不是必填,所以可以只傳空行觸發 event

知道 SSE 的樣貌之後,我們可以實際使用 Flask 實作 SSE server:

import time
import json

from flask import Flask, Response


app = Flask(__name__)


@app.route('/stream')
def stream():
    def iter_data():
        while True:
            yield 'data:' + json.dumps({'time': time.strftime('%Y-%m-%d %H:%M:%S')}) + '\n\n'
            time.sleep(3)
    return iter_data(), {'Content-Type': 'text/event-stream'}


if __name__ == '__main__':
    app.run(debug=True)

Flask 也支援以串流(stream)方式傳送資料,最簡單的方式是 API 回傳 1 個 generator 即可,所以上述程式碼的 stream() API 回傳 1 個 iter_data() generator, 該 generator 會在無窮迴圈中 yield 資料,同時為了符合 SSE 規定,額外以 1 個 dictionary 指定 Content-Typetext/event-stream

p.s. 如果 SSE server 前面還有擋 1 層 nginx 的話,可以多設定 header X-Accel-Buffering: no , 避免 nginx 把 SSE server 的回應存到 buffer, 造成 SSE 運作不正常

p.s. 如果你喜歡使用 FastAPI 可以參考 StreamingResponse 章節

這就是 SSE 在後端的樣貌,無論用什麼框架(framework)或者語言都是類似的作法。

接下來,試試看用 curl 指令測試是否會持續收到來自 server 的資料,指令如下:

$ curl http://127.0.0.1:5000/stream

運作正常的話,就可以看到一直收到類似以下的字串:

data:{"time": "2024-05-21 14:55:42"}

data:{"time": "2024-05-21 14:55:45"}

data:{"time": "2024-05-21 14:55:48"}

那前端呢?

誠如前文所述,前端只要建立 1 個 EventSource 就可以與 server 端建立連線。

前端最簡單的 HTML & JavaScript 如下所示:

index.html

<!DOCTYPE html>
<html>
<head>
    <title>SSE Example</title>
</head>
<body>
    <h1>Server-Sent Events</h1>
    <div id="events"></div>
    <script type="text/javascript">
        const eventSource = new EventSource('/stream');
        eventSource.onmessage = function(event) {
            const eventsDiv = document.getElementById('events');
            eventsDiv.innerHTML += `<p>${event.data}</p>`;
        };
    </script>
</body>
</html>

與 Flask 整合之後, Python 程式碼如下:

flask_sse_demo.py

import json
import time

from flask import Flask, Response, render_template

app = Flask(__name__)


@app.route('/stream')
def stream():
    def iter_data():
        while True:
            yield 'data:' + json.dumps({'time': time.strftime('%Y-%m-%d %H:%M:%S')}) + '\n\n'
            time.sleep(3)
    return iter_data(), {'Content-Type': 'text/event-stream'}


@app.route('/')
def index():
    return render_template('index.html')


if __name__ == '__main__':
    app.run(debug=True)

上述 'index.html' 需要放在 templetes 資料夾底下,專案結構為:

.
├── templates
│   └── index.html
└── flask_sse_demo.py

可以使用以下指令啟動 SSE server:

$ python flask_sse_demo.py

啟動之後,可以用瀏覽器打開網址 http://127.0.0.1:5000 , 就會看到 SSE 開始運作:

sse-demo.png

也可以在開發者工具看到 SSE 傳輸資料的資訊:

event-stream.png

如果想進一步將 JSON 資料解開,則只需要在 client 端使用 JSON.parse() 即可。

要怎麼在 while 迴圈裡面接收 event?

相信文到此處,大家都知道 SSE 實際怎麼運作了。

不過應該有很多人會跟我一樣好奇,既然 SSE 可能是以 whilefor 迴圈實作(或無窮迴圈),那要怎麼針對個別使用者發送專屬的資料?

在此提供 1 個比較簡單的做法:

使用 Redis, Kafka, RabbitMQ 等訊息訂閱/發布服務(pub-sub)

直接看以下程式碼可能會更容易理解,以下程式碼使用 Redis pub-sub 功能,訂閱事件以觸發 SSE:

r = redis.Redis(host='localhost', port=6379, db=0)

@app.route('/stream')
def stream():
    def push_events():
        sub = r.pubsub()
        sub.subscribe('the_channel_you_want')
        while True:
            message = sub.get_message()
            if message:
                # do something with the message
                data = ...
                yield f'data:{data}\n\n'
            time.sleep(1)

    return push_events(), {'Content-Type': 'text/event-stream'}

上述程式碼中的 the_channel_you_want 就是後端接收事件的頻道(channel),我們可以藉由發布訊息到該頻道觸發 SSE (設計上可以不同使用者訂閱不同頻道,或者訂閱相同頻道,端看需求為何)。

p.s. 本範例僅供教學參考,實際上設計也要注意每個訊息訂閱/發布服務都有連線數上限,過多的 clients 也會達到連線數上限

Server-Sent Events 的問題

如前文所述,建立 EventSource 之後瀏覽器會與 server 建立長期連線,如果 Sever 端是採用多執行緒架構,就會有 1 個執行緒被佔用;如果 Server 端是採用多 processes 架構,就會有 1 個 process 被佔用。

佔用的情況會導致 server 一直有運算資源無法釋放,最終造成無法服務其他 clients 。

如果想模擬這情況,可以把前述範例的 app.run() 改為下列程式碼,模擬僅有 1 個 process 單執行緒的情況:

app.run(debug=True, processes=1, threaded=False)

改成上述形式之後,可以使用 2 個終端機各自輸入 curl http://127.0.0.1:5000/stream ,就會發現其中 1 個指令始終無法收到資料,只有當另 1 個指令結束之後,才能收到資料,這就是 process 被佔用導致無法服務其他 clients 的情況,這個情況也間接揭露 SSE 在面對巨量連線時可能會產生效能瓶頸的缺點。

所以後端架構如果要加上 SSE 功能,最好與一般的 API server 分開,避免 SSE 佔用運算資源的情況發生,也避免影響一般的 API 服務。

總結

SSE 是 1 個介於 polling 與 WebSocket 的技術,是相對 WebSocket 輕量的技術,允許伺服器單向持續地向客戶端發送資料,而不需要客戶端持續發送請求詢問,這對於需要即時更新的應用場景非常有用,例如股票行情、聊天應用、即時通知等。

不過 SSE 佔用連線、運算資源的特性,也使得它在多連線數的情況下可能會產生效能瓶頸,因此實作 SSE 時須多加考慮過多連線的情況下應該如何處理。

以上!

Enjoy!

References

HTML Standard - Server-Sent Events

Streaming Contents — Flask Documentation (2.3.x)

對抗久坐職業傷害

研究指出每天增加 2 小時坐著的時間,會增加大腸癌、心臟疾病、肺癌的風險,也造成肩頸、腰背疼痛等常見問題。

然而對抗這些問題,卻只需要工作時定期休息跟伸展身體即可!

你想輕鬆改變現狀嗎?試試看我們的 PomodoRoll 番茄鐘吧! PomodoRoll 番茄鐘會根據你所設定的專注時間,定期建議你 1 項辦公族適用的伸展運動,幫助你打敗久坐所帶來的傷害!

贊助我們的創作

看完這篇文章了嗎? 休息一下,喝杯咖啡吧!

如果你覺得 MyApollo 有讓你獲得實用的資訊,希望能看到更多的技術分享,邀請你贊助我們一杯咖啡,讓我們有更多的動力與精力繼續提供高品質的文章,感謝你的支持!