用 Python 學網路程式設計重要概念 — 從 asyncio 到 asyncio 混搭 Multi-process

Last updated on  Jul 25, 2024  in  Python 程式設計 - 中階  by  Amo Chen  ‐ 10 min read

「 asyncio 就是快」應該是許多人對於 Python asyncio 的認知,但實際上 asyncio 就跟眾多技術一樣並不完美,它也有不擅長以及適合它的應用場景,認識這些它的缺點與優點將可以讓你在不同的應用場景上做出正確的技術決策。

本文將使用 asyncio 高階函式帶大家體驗用 asyncio 開發網路應用的感覺,並打破「 asyncio 就是快」的迷思,從而看到 asyncio 的本質!

本文環境

  • Python 3

前言

本文需要讀者具備 I/O 多工、多執行緒、 multi-process 與 asyncio 等概念有基礎認識,如果對前述概念不夠熟悉,建議閱讀下列文章後再閱讀本文:

本文將使用高階的 asyncio 模組與 socketserver 模組,節省撰寫低階 socket 的程式碼的過程,將可以更專注 asyncio 的部份。

為什麼 asyncio 適合 I/O 類型的網路服務?

網路服務在處理 request 與 response 時,通常有大量的時間花在處理 I/O 相關事務,例如接收、讀取網路傳來的資料或是透過網路發送資料等。這些 I/O 工作使得 CPU 也必須等待,使得 CPU 無法執行其他工作,造成等待期間閒置寶貴的運算資源。

asyncio 則是藉由 coroutine 與 event loop 的力量,讓 CPU 能在等待期間切換執行其他工作,並稍後再切換回來確認先前工作是否已有結果。如果沒有結果就再切換執行其他工作,讓人感受就像同時執行很多工作一樣,但其實是利用以往閒置的時間執行其他工作而已。

因此, asyncio 尤其適合一些需要處理大量 I/O 操作的網路服務,例如圖片上傳、下載和聊天應用,因為它解決了 CPU 在等待 I/O 完成時閒置運算資源的問題。

p.s. 如果你的網路服務是需要 CPU 密集計算的話,那麼就不適合使用 asyncio 。

asyncio event loop 與執行緒的關係

許多對 asyncio 不夠熟悉的初學者,很可能會誤會 asyncio 是不同於執行緒、 process 的技術,但實際上 asyncio 的運作是靠 event loop, 而 event loop 本質上也僅僅只是在單一執行緒中執行的 while 迴圈。

了解這一點後,我們會知道撰寫 asyncio 的 Python 程式碼本質上屬於單執行緒的架構。然而,在這基礎之上,我們仍能導入多執行緒、 multi-process 等技術架構。

asyncio-process-thread-event-loop.png

asyncio 高階函式簡介

理解 asyncio 基本概念之後,我們可以使用 Python 所提供的 asyncio 高階函式幫助我們開發網路伺服器,這些函式包含:

上述函式幫開發者把利用 asyncio 開發伺服器必須注意的實作細節都包裝好了,讓開發者可以專注在處理 request 與 response 的邏輯上就好。

做個簡單的 asyncio HTTP 伺服器

接下來,我們用 asyncio 高階函式做個極簡單的 HTTP 伺服器,以下是其 Python 程式碼:

server.py

import asyncio


async def handle_request(reader, writer):
    # Read the incoming HTTP request
    request = await reader.read(1024)
    request = request.decode('utf-8')
    print(f"Received request:\n{request}")

    # Parse the request (basic parsing to extract the request line)
    request_line = request.split('\r\n')[0]
    method, path, version = request_line.split()
    print(f"Method: {method}, Path: {path}, HTTP Version: {version}")

    # Create a basic HTTP response
    response = (
        "HTTP/1.1 200 OK\r\n"
        "Content-Type: text/html\r\n"
        "Connection: close\r\n"
        "\r\n"
        "<html><body><h1>Hello, asyncio server!</h1></body></html>\r\n"
    )

    # Send the HTTP response
    writer.write(response.encode('utf-8'))
    await writer.drain()
    print("Response sent")

    # Close the connection
    writer.close()
    await writer.wait_closed()
    print("Connection closed")


async def http_server():
    # Create a server that listens on port 65432
    server = await asyncio.start_server(handle_request, '127.0.0.1', 65432)
    addr = server.sockets[0].getsockname()
    print(f"Serving on {addr}")

    async with server:
        await server.serve_forever()


# Run the main function
asyncio.run(http_server())

上述程式執行指令如下:

$ python server.py

執行成功之後可以打開瀏覽器,網址輸入 http://127.0.0.1:65432/ 就可以看到類似以下畫面:

asyncio-server.png

以下重點說明上述程式碼。

首先, asyncio.start_server() 高階函式已經被簡化成開發者只需傳入 3 個參數:

  1. 如何處理 connection (或 request) 的 callable 或者 coroutine 函式,此參數稱為 client_connected_cb callback, 對應程式碼定義的 coroutine handle_request
  2. 伺服器需要監聽的位址,對應程式碼中的 127.0.0.1
  3. 伺服器需要監聽的通訊埠,對應程式碼中的 65432
server = await asyncio.start_server(handle_request, '127.0.0.1', 65432)

asycnio.start_server() 會回傳 1 個 asyncio.Server 類別的實例,該類別 Python 3.7 之後有實作 asynchronous context manager 的相關方法,所以可以使用 async with 語法使用 asynchronous context manager 的功能。最後,我們呼叫 server 實例(instance)的 serve_forever() 方法,讓伺服器開始接受連線提供服務:

async with server:
    await server.serve_forever()

眼尖的讀者應該會發現,使用 asyncio 高階函式開發伺服器的步驟,跟使用 socketserver 模組的方式相當類似,以下是 socketserver 的版本:

HOST, PORT = "127.0.0.1", 65432
with socketserver.TCPServer((HOST, PORT), HTTPRequestHandler) as server:
    server.serve_forever()

除了使用方法相似之外,因為 asyncio 的高階函式都是 coroutine 與 asyncio.Server 類別也定義不少 coroutine 方法,所以必須使用 await 關鍵字呼叫:

coroutine.png

再回過頭來看處理連線(或 request )的 coroutine handle_request

asyncio.start_server() 規定 client_connected_cb callable 或 coroutine function 會收到 2 個參數:

  1. reader
  2. writer

這 2 個分別代表 StreamReader 與 StreamWriter 類別的實例,用來讀取資料與寫入資料,可以對應 request 與 response 。

這就是為何 handle_request() coroutine 需要 2 個參數的原因。

async def handle_request(reader, writer):
    ...

同樣地,這 2 個類別屬於 asyncio 相關類別,所以這 2 個類別也都有實作一些 coroutine 方法,因此需要特別注意用 await 語法呼叫的情況。

handle_request() coroutine 做的事情也很簡單:

  1. 讀取 request 的內容,對應 await reader.read(1024) 的部分。

  2. 回應客戶端 HTML 格式資料,對應程式碼:

    writer.write(response.encode('utf-8'))
    await writer.drain()
    
  3. 關閉連線,對應程式碼:

    writer.close()
    await writer.wait_closed()
    

有人可能會疑問為何 writer.write()writer.close() 之後,都還要額外加 1 個 await writer.drain() 以及 await writer.wait_close() ,這是 Python 官方文件建議的緣故。

drain.png

而底層原因是當使用 writer.write() 進行資料寫入時,資料會被放入內部的緩衝區, drain() coroutine 可以在資料緩衝區(buffer)滿的時候,先阻擋其他寫入操作,直到有足夠空間可以寫入時,再恢復寫入操作,這個技術稱為 flow control ,其相關的邏輯實作在 FlowControlMixin 類別與 _FlowControlMixin 類別中。

wait_closed.png

await writer.wait_closed() 則是僅是用來確保 writer 正確關閉的機制, coroutine wait_closed() 只是 await 1 個 Future object 而已(原始碼),實際上在呼叫 writer.close() 時,相關連線就會被關閉(可以一路追蹤到底層的 _SelectorTransport 原始碼),然後它就會呼叫 StreamReaderProtocolconnection_lost() 方法,該方法會為 writer.wait_closed() 裡的 Future object 設定 result, 如此一來該 Future object 就不會處於等待狀態,代表 writer 正常結束。

覺得此處難以理解而跳過的話,並不會有任何影響,原則上只要按照官方文件所寫做好 2 件事即可:

  • writer.write(data) 之後,也要 await writer.drain()
  • writer.close() 之後,也要 await writer.wait_closed()

與單執行緒版本 HTTP 伺服器比較

前文提到 Python asyncio 的本質是單執行緒架構,所以我們可以做件有趣的事——進行 asyncio 版本的 HTTP 伺服器與單執行緒版本的 HTTP 伺服器的效能比較

socketserver 版本的 HTTP 伺服器

以下是使用 socketserver 模組實作的 HTTP 伺服器:

single_thread_http_server.py

import socketserver


class HTTPRequestHandler(socketserver.StreamRequestHandler):
    def handle(self):
        self.request.recv(1024)
        self.wfile.write(
            (
                b"HTTP/1.1 200 OK\n"
                b"Content-Type: text/html\n\n"
                b"<html>"
                b"<head><title>Example</title></head>"
                b"<body><h1>Hello, World!</h1></body>"
                b"</html>\n"
            )
        )

class Server(socketserver.TCPServer):
    allow_reuse_address = True
    request_queue_size = 1000

if __name__ == "__main__":
    HOST, PORT = "127.0.0.1", 65432
    with Server((HOST, PORT), HTTPRequestHandler) as server:
        print("Server started on", HOST, "port", PORT)
        server.serve_forever()

上述程式碼需要注意繼承 socketserver.TCPServer 並覆寫(override)屬性 request_queue_size 為 1,000 的地方,這是由於預設的 request_queue_size 為 5, 用 Apache ab 指令進行壓力測試時,如果瞬間 requests 數超過 queue 的大小,將很容易引起以下錯誤:

This is ApacheBench, Version 2.3 <$Revision: 1903618 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/

Benchmarking 127.0.0.1 (be patient)
apr_socket_recv: Connection reset by peer (54)

所以我們需要調整 request_queue_size 的數值, 1000 是我們測試的 request 總數。

asyncio 版本的 HTTP 伺服器

以下程式碼則為 asyncio 版本的 HTTP 伺服器:

asyncio_http_server.py

import asyncio


async def handle_request(reader, writer):
    # Read the incoming HTTP request
    await reader.read(1024)

    # Create a basic HTTP response
    writer.write(
        b"HTTP/1.1 200 OK\n"
        b"Content-Type: text/html\n\n"
        b"<html>"
        b"<head><title>Example</title></head>"
        b"<body><h1>Hello, World!</h1></body>"
        b"</html>\n"
    )
    await writer.drain()

    writer.close()
    await writer.wait_closed()


async def http_server():
    # Create a server that listens on port 65432
    server = await asyncio.start_server(handle_request, '127.0.0.1', 65432)
    addr = server.sockets[0].getsockname()
    print(f"Serving on {addr}")

    async with server:
        await server.serve_forever()


# Run the main function
asyncio.run(http_server())

雖然 asyncio 與 socketserver 兩者底層實作方式不同,不過我們還是盡可能地讓 2 個版本的程式碼維持簡單與一致,以確保不會有太多變數影響兩者的回應速度。

Apache ab 測試指令

以下是本文的 Apache ab 測試指令,模擬 100 個 concurrency 與總數 1,000 的 requests:

$ ab -c 100 -n 1000 http://127.0.0.1:65432

本機 127.0.0.1 測試結果比較

接著,各自執行 1 次前述不同版本的 HTTP 伺服器與 ab 測試指令之後,其比較結果如下:

compare-01.png

上圖左邊是 socketserver 版本,右邊是 asyncio 版本。從結果可以明顯看到 socketserver 單執行緒版本的 HTTP 伺服器效率較好, 99% 的 requests 都在 12ms 內回應完成,而且 RPS (request per seconds) 約在 3,532 左右,吞吐量也明顯比 asyncio 版本來得好。

這個結果頗令人訝異!

因為我們原先預期是 asyncio 在效能方面會佔優勢,但想不到是單執行緒版本效能佔優勢!

單執行緒表現較佳的原因其實也很簡單:

本機(127.0.0.1)環境下,幾乎沒有網路延遲(latency),加上每次回應僅有 84 bytes 的大小,對於單執行緒版本的 HTTP server 來說,每個 request 都能夠在相當短的時間內處理完畢;而 asyncio 版本的 HTTP 則需仰賴 event loop 處理各種事件,這個運作機制先天就有一定的時間支出。因此,在這個測試中,同樣都是單執行緒的架構, asyncio 的表現就來的遜色些。

小結一下,以下 3 個原因使得 asyncio 版本的 HTTP 伺服器表現相對遜色:

  • 低延遲
  • 資料量小
  • Event loop 的固定開銷

實際網路環境測試結果比較 — 延遲的影響

我們將同樣的程式碼搬到 DigitalOcean 上進行測試(受測的機器規格為 512MB / 1 CPU / 地理位置舊金山),同樣用 Apache ab 指令測試兩者表現,其測試結果如下:

compare-02.png

從這次結果可以發現兩者的效能相當接近, RPS 約在 300 左右,而 99% 的回應在 360ms 左右完成,兩者差距相當小,仍無法說明 asyncio 佔效能優勢,但可以確定網路延遲(latency)會對單執行緒 HTTP 伺服器的效能有所影響。

實際網路環境測試結果比較 — 回應資料大小的影響

接著,我們也故意將回應資料的大小做變化並進行測試,以下 2 個版本的程式碼故意將 <h1>Hello, World!</h1> 重複 10,000 次,讓回應資料的大小變成 220 kb

socketserver 版本 HTTP 伺服器

single_thread_http_server.py

import socketserver

data = b"<h1>Hello, World!</h1>" * 10000

class HTTPRequestHandler(socketserver.StreamRequestHandler):
    def handle(self):
        self.request.recv(1024)
        self.wfile.write(
            (
                b"HTTP/1.1 200 OK\n"
                b"Content-Type: text/html\n\n"
                b"<html>"
                b"<head><title>Example</title></head><body>"
                + data +
                b"</body></html>\n"
            )
        )


class Server(socketserver.TCPServer):
    allow_reuse_address = True
    request_queue_size = 1000


if __name__ == "__main__":
    HOST, PORT = "0.0.0.0", 65432
    with Server((HOST, PORT), HTTPRequestHandler) as server:
        print("Server started on", HOST, "port", PORT)
        server.serve_forever()
asyncio 版本 HTTP 伺服器

asyncio_http_server.py

import asyncio

data = b"<h1>Hello, World!</h1>" * 10000

async def handle_request(reader, writer):
    # Read the incoming HTTP request
    await reader.read(1024)

    # Create a basic HTTP response
    writer.write(
        b"HTTP/1.1 200 OK\n"
        b"Content-Type: text/html\n\n"
        b"<html>"
        b"<head><title>Example</title></head><body>"
        + data +
        b"</body></html>\n"
    )
    await writer.drain()

    writer.close()
    await writer.wait_closed()


async def http_server():
    # Create a server that listens on port 65432
    server = await asyncio.start_server(handle_request, '0.0.0.0', 65432)
    addr = server.sockets[0].getsockname()
    print(f"Serving on {addr}")

    async with server:
        await server.serve_forever()


# Run the main function
asyncio.run(http_server())

各自執行 1 次前述不同版本的 HTTP 伺服器與 ab 測試指令之後,其比較結果如下:

compare-03.png

從上圖結果可以看出,在增加回應資料大小的情況下,無論是整體回應時間還是吞吐量,都是 asyncio 佔優勢。但是,兩者在處理較大資料量時的表現,都相較於原本資料僅需回應 84 bytes 時來得不理想。

單執行緒版本的 HTTP 伺服器輸在它必須 1 個要求處理結束之後,才能再處理下 1 個 request, 屬於循序處理的情況,特別是在進行網路資料傳輸時, CPU 也必須等待網路傳輸工作完成,才能處理下 1 個 request 。

而 asyncio 版本的 HTTP 伺服器則可以藉由 event loop 與 asyncronus I/O 的能力,在 1 個 request 尚未處理完成時切換執行其他 request 。例如,進行網路傳輸時, CPU 可以先切換處理其他要求。從這個增加回應資料大小的案例中,可以明顯看到 asyncio 的優勢。

總結一下,從這 2 個案例可以學到幾件事。

在真實網路環境中,當回應資料量較小時,使用 asyncio 的效能可能會跟使用單執行緒的伺服器差不多(或者差了一點),這跟我們直覺上認為的「 asyncio 就是快」有所抵觸。

更適合的說法其實是——「 asyncio 提供開發者一種更經濟、成本更低的 concurrent 的解決方案。也就是説,借助 asyncio 的力量,我們可以不需要寫多執行緒、 multi-process 的程式碼,也能夠做到 concurrent !

asyncio 不夠快這件事情,也有一些文章有提到,例如 Why asynchronous Python code is slower than a regular one 一文提到多數 asyncio 的框架(framework)輸給 syncronous 版本的框架。

Most people understand that async Python has a higher level of concurrency. It would make some sense for that to imply higher performance for common tasks like serving dynamic web sites or web APIs.

Sadly async is not go-faster-stripes for the Python interpreter.

事實就是 synchronous 並沒有那麼糟糕,而 asynchronous 也沒有那麼完美,一切技術都要應用在正確的場景才能發揮真正的價值與優勢。

搭配 multi-process 提升效能!

前述測試中, asyncio 版本的 HTTP 伺服器在增大回應資料量時,可以發現即使它能夠處理比單執行緒多一點的 requests, 但它畢竟還是不如回應小量資料時表現。

為了解決效能問題,我們可以考慮使用多執行緒或者 multi-process 提升它的效能。

接下來,本文將使用 multi-process 提升它的效能。原因除了實作簡單之外,還有 Python 的多執行緒會有 GIL 的限制,使用 multi-process 可以避開 GIL 的限制,從而提升效能(多核心 CPU 會更有感)。

以下是 multi-prcoess 版本的 asyncio HTTP 伺服器程式碼:

import os
import asyncio

data = b"<h1>Hello, World!</h1>" * 10000

async def handle_request(reader, writer):
    # Read the incoming HTTP request
    await reader.read(1024)

    # Create a basic HTTP response
    writer.write(
        b"HTTP/1.1 200 OK\n"
        b"Content-Type: text/html\n\n"
        b"<html>"
        b"<head><title>Example</title></head><body>"
        + data +
        b"</body></html>\n"
    )
    await writer.drain()

    writer.close()
    await writer.wait_closed()


async def pre_fork(server, num_processes):
    for _ in range(num_processes):
        pid = os.fork()
        if pid == 0:  # Child process
            addr = server.sockets[0].getsockname()
            print(f"[PID {os.getpid()}] Serving on {addr}")
            async with server:
                await server.serve_forever()
            os._exit(0)


async def http_server():
    # Create a server that listens on port 65432
    server = await asyncio.start_server(handle_request, '0.0.0.0', 65432)
    await pre_fork(server, 2)
    addr = server.sockets[0].getsockname()
    print(f"[PID {os.getpid()}] Serving on {addr}")

    async with server:
        await server.serve_forever()


# Run the main function
asyncio.run(http_server())

上述程式碼主要多了 pre_fork() coroutine ,用來將 HTTP 伺服器 fork 到新的 process ,作法跟用 Python 學網路程式設計重要概念 — 從多執行緒到 Multi-process, Pre-fork 再到 Multi-process 混搭 Multi-thread一文提到的一樣。

至此,我們已經知道 asyncio 怎麼搭配 multi-process 架構,在此將不多做額外的效能測試,有興趣的讀者可以自行在多核心 CPU 機器內進行測試。

以上就是 asyncio 搭配 multi-process 的方式。

補充另一種作法

上述程式碼使用 asyncio 的 start_server() 高階函式與 os.fork() 函式完成 asyncio + multi-process 的功能。

其實也可以用 multiprocessing 模組與 socket 模組取代 os.fork() 函式,其完整程式碼如下(感謝 KuanJuChenHermanChen 提供):

import os
import asyncio
import multiprocessing
import socket
from datetime import datetime

data = b"<h1>Hello, World!</h1>" * 10000

async def handle_request(reader: asyncio.streams.StreamReader, writer: asyncio.streams.StreamWriter):
    print(f"[{datetime.now()}] Handling request in process [PID {os.getpid()}]")

    response = (
        b"HTTP/1.1 200 OK\r\n"
        b"Content-Type: text/html\r\n\r\n"
        b"<html>"
        b"<head><title>Example</title></head><body>"
        + data +
        b"</body></html>\r\n"
    )
    writer.write(response)
    await writer.drain()

    writer.close()
    await writer.wait_closed()

async def start_server(sock):
    server = await asyncio.start_server(handle_request, sock=sock)
    addr = server.sockets[0].getsockname()
    print(f"[PID {os.getpid()}] Serving on {addr}")

    async with server:
        await server.serve_forever()

def run_server(sock):
    asyncio.run(start_server(sock))

def main():
    HOST, PORT = "0.0.0.0", 65432
    num_processes = 4

    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        sock.bind((HOST, PORT))
        sock.listen(1000)
        sock.setblocking(False)

        processes: list[multiprocessing.Process] = []
        for _ in range(num_processes):
            p = multiprocessing.Process(target=run_server, args=(sock,))
            p.start()
            processes.append(p)

        for p in processes:
            p.join()

if __name__ == "__main__":
    main()

總結

在這篇文章,我們認識如何用 asyncio 開發伺服器,以及破解坊間對「 asyncio 就是快」的迷思,並了解真正適合 asyncio 的應用場景,避免將 asyncio 技術應用在不適合的場景。

以上!

Enjoy!

References

asyncio — Asynchronous I/O

Python - Streams

Why asynchronous Python code is slower than a regular one

對抗久坐職業傷害

研究指出每天增加 2 小時坐著的時間,會增加大腸癌、心臟疾病、肺癌的風險,也造成肩頸、腰背疼痛等常見問題。

然而對抗這些問題,卻只需要工作時定期休息跟伸展身體即可!

你想輕鬆改變現狀嗎?試試看我們的 PomodoRoll 番茄鐘吧! PomodoRoll 番茄鐘會根據你所設定的專注時間,定期建議你 1 項辦公族適用的伸展運動,幫助你打敗久坐所帶來的傷害!

贊助我們的創作

看完這篇文章了嗎? 休息一下,喝杯咖啡吧!

如果你覺得 MyApollo 有讓你獲得實用的資訊,希望能看到更多的技術分享,邀請你贊助我們一杯咖啡,讓我們有更多的動力與精力繼續提供高品質的文章,感謝你的支持!