用 Python 學網路程式設計重要概念 — 從單執行緒到 I/O 多工(I/O multiplexing)

Posted on  May 12, 2024  in  Python 程式設計 - 中階  by  Amo Chen  ‐ 15 min read

在學習網路程式設計時,每個人都遇過怎麼讓 sever 可以盡可能地處理更多連線的課題,要達到這個目的,多數人直覺應該會想到運用多執行緒、多 processes 的架構,為每個連線建立 1 個新的執行緒或 process 處理,但是這種方法在處理大量連線時,就顯得捉襟見肘,單就多執行緒的解決方案來說,隨著連線數上升,首先就會遇到 CPU context switch 所造成的效能問題,更遑論多 processes 架構會需要使用比多執行緒架構更多資源的問題。

但現代有很多應用僅用單執行緒就能夠處理龐大的連線數,包含 Nginx, Redis 等知名開源軟體,這些軟體是怎麼做到的呢?這就需要談到 1 種稱為 I/O multiplexing 的技術!

本文將教導 1 種稱為 I/O multiplexing 的技術,該技術能夠讓單執行緒處理多個連線,將效能壓榨出來!

p.s. 這個技術還可以與多執行緒、多 processes 架構混用

本文環境

  • Python 3
  • 核心(Kernel)版本 2.5.45 (含 2.5.45) 以上 Linux 作業系統

本文要求

本文適合對 Python 有基礎認識,而且正在學習網路程式設計的人。

Client-Server 架構 / Client-Server model

在學習網路程式設計時,有 1 個相當重要的概念稱為 Client-Server 架構, 這是現在各種主流網路服務所採用的架構,例如網頁、線上遊戲、雲端空間等等。

client-server.png

在這個架構下,服務提供方作為 Server, 或稱為伺服器,會被動的等待來自使用者的要求,伺服器接到要求之後會開始進行處理,最後再將結果回傳給使用者,而使用者的角色就稱為 Client, 或者客戶。

p.s. 補充說明,除了 Client-Server 架構,另 1 種架構稱為 P2P (peer to peer),在此架構下每個節點(node)會同時擁有 Client 與 Server 的角色,既能對外提供服務,也能向其他節點發出請求

本文將專注於探討 Client-Server 架構。

Echo Server & Client

開始本文之前,介紹本文會一再重複使用的 echo server 與 client 。

Echo server 介紹

Echo server 是 1 個可以用 Python socket 模組實作的伺服器,它唯一的作用就是接收來自 client 傳送的資料,然後又將相同資料傳送回去給 client 。

以下是 echo server 的 Python 程式碼,我們會從此版本開始,不斷地針對它所遭遇的問題進行改進,藉此了解網路程式設計的幾個重要概念:

echo_server.py

import socket


def echo_server(host, port):
    # Create a socket object with IPv4 and TCP protocol
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    # Bind the socket to a public host, and a port
    server_socket.bind((host, port))
    server_socket.listen(3)  # start listening for incoming connections

    print(f"Echo server is running on {host}:{port}")

    try:
        while True:
            # Establish connection with client
            client_socket, addr = server_socket.accept()
            print(f"Connected by {addr}")

            try:
                while True:
                    # Receiving data from the client
                    data = client_socket.recv(1024)  # buffer size is 1024 bytes
                    if not data:
                        break  # no more data from the client
                    print(f"Received: {data.decode()} | Client: {addr}")

                    # Echoing back the data to the client
                    client_socket.sendall(data)
            finally:
                client_socket.close()
    finally:
        server_socket.close()


if __name__ == "__main__":
    HOST = "127.0.0.1"
    PORT = 65432
    echo_server(HOST, PORT)

p.s. server_socket.listen(3) 代表 backlog queue, 是 server 所可以容納的仍未被 accept 的連線數

上述範例會在本機(localhost)的 65432 通訊埠執行。

執行指令如下:

$ python echo_server.py

Echo Client 介紹

以下是本文所使用的 client 程式碼,本文會使用該程式碼連線到各個版本的 echo server 進行測試:

echo_client.py

import socket


def echo_client(host, port):
    # Create a socket object
    client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    # Connect to the server
    client_socket.connect((host, port))

    try:
        while True:
            # Take input from the user
            message = input("Enter message to send: ")
            if message.lower() == 'exit':
                break

            # Send data to the server
            client_socket.sendall(message.encode())
            print(f'Sent "{message}" to echo server')

            # Receive data from the server
            data = client_socket.recv(1024)  # buffer size is 1024 bytes
            print(f"Received from server: {data.decode()}\n")
    finally:
        client_socket.close()


if __name__ == "__main__":
    HOST = "127.0.0.1"
    PORT = 65432
    echo_client(HOST, PORT)

上述程式碼執行指令如下,請先執行 echo server 再執行以下指令:

$ python echo_client.py

執行成功之後,可以輸入任意文字,並且收到來自 echo server 的回應。

第 1 個問題 — Echo server 沒辦法接受 2 個以上的連線

目前為止的 echo server 程式碼,在只有 1 個 echo client 時,運作非常順暢。

一旦打開另 1 個新的終端機,再執行第 2 個 echo client 時,就會發現第 2 個 echo client 在送出資料之後,就再也沒有任何來自 echo server 的回應,而第 1 個 echo client 反而運作正常,也能收到來自 echo server 的回應。

如果仔細看 echo server 的列印的訊息,會發現根本沒有第 2 個 echo client 連線字串:

$ python echo_server.py
Echo server is running on 127.0.0.1:65432
Connected by ('127.0.0.1', 58551)

當我們把第 1 個 echo client 以 CTRL + c 結束時,又會發現第 2 個 echo client 連上了!

$ python echo_server.py
Echo server is running on 127.0.0.1:65432
Connected by ('127.0.0.1', 58551)
Connected by ('127.0.0.1', 58553)

這就是我們所遭遇的第 1 個問題 — 現有的 echo server 程式碼雖然有用 while 迴圈接受新連線的部分,卻只能處理 1 個連線⋯⋯。

造成這個問題的原因有 2 個。

1. while 無窮迴圈

第 1 個從程式碼層面來說,當我們進到第 2 個 while 無窮迴圈時,根本沒有機會讓程式回到外層接受新的連線,並且處理新連線所發送的資料:

echo-server-problem-1.png

2. 阻塞模式(blocking mode)

網路傳輸是 1 種 I/O 行為,在談到 I/O 的行為時,分為 2 種模式:

  1. 阻塞式(Blocking)
  2. 非阻塞式(Non-blocking)

簡單來說,兩者區別在於會不會卡住 process 做其他事,阻塞(blocking)名符其實,會卡住 process 執行其他工作。原理其實是把控制權交給作業系統,等作業系統完成工作後再繼續執行 process 。

而 socket 預設使用阻塞模式,所以現在 echo server 程式碼主要有多個地方會造成阻塞,例如:

  1. server_socket.accept() 伺服器等待接受新連線
  2. client_socket.sendall(message.encode()) 將資料傳給客戶端
  3. client_socket.recv(1024) 等待從客戶端傳來的資料

這些地方都會讓程式停在那 1 行,直到有新連線建立或者新的資料傳出去或傳進來,才接著繼續跑下面流程。

所以要讓 echo server 能夠處理多個連線的方法,就必須:

  1. 打破第 2 個 while 迴圈的僵局,讓 echo server 有機會接受新連線
  2. 解決阻塞模式造成的等待問題

如果阻塞是問題,那就改成非阻塞!

把 Echo server 改成 Non-blocking 模式

要讓 socket 模組使用 non-blocking 模式,只要呼叫以下 setblocking 方法即可:

<socket object>.setblocking(False)

這 1 行的作用等同於以下 settimeout 方法:

<socket object>.settimeout(0.0)

它的作用其實是只要 socket 操作沒辦法立即完成,就視為失敗(failed):

In non-blocking mode, operations fail (with an error that is unfortunately system-dependent) if they cannot be completed immediately

先在 sever_socket.listen(3) 後加上 server_socket.setblocking(False) 看看會發生什麼事:

server_socket.listen(3)
server_socket.setblocking(False)

接著當我們試圖啟動 echo_server.py 時,會出現類似以下的錯誤:

$ python echo_server.py
Echo server is running on 127.0.0.1:65432
Traceback (most recent call last):
  File "echo_server.py", line 41, in <module>
    echo_server(HOST, PORT)
  File "echo_server.py", line 17, in echo_server
    client_socket, addr = server_socket.accept()
  File "/.../socket.py", line 292, in accept
    fd, addr = self._accept()
BlockingIOError: [Errno 35] Resource temporarily unavailable

從結果可以看到 client_socket, addr = server_socket.accept() 拋出 BlockingIOError

其實原因也很簡單,因為 server_socket.accept() 沒有立即完成,也就是沒有任何 client 在這個時候連線,因此造成 BlockingIOError ,這就是 non-blocking 模式會遇到的問題,沒有立即完成就視為失敗。

要解決這個問題,最暴力的方法就是使用 while 迴圈,瘋狂試到有連線為止!

使用 non-blocking 模式的 echo server 程式碼如下,這個版本我們把 server_socketclient_socket 都改為使用 non-blocking 模式,同時我們把 client_socket 都存到 clients list 中,用 for 迴圈 1 個 1 個詢問客戶端是否有傳送資料上來,詢問完之後,就又回到 while 迴圈一開始,看看有沒有新的連線,如此往復:

non_blocking_echo_server.py

import socket
import time


def echo_server(host, port):
    # Create a socket object with IPv4 and TCP protocol
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server_socket.bind((host, port))
    server_socket.listen(3)
    server_socket.setblocking(False)  # Set server socket to non-blocking

    print(f"Echo server is running on {host}:{port}")

    clients = []
    try:
        while True:
            try:
                # Try to accept a new connection
                client_socket, addr = server_socket.accept()
                print(f"Connected by {addr}")
                client_socket.setblocking(False)
                clients.append(client_socket)
            except BlockingIOError:
                # No client is trying to connect
                pass

            # Iterate over connected clients and read data
            for client in clients[:]:  # Iterate over a copy of the list
                try:
                    data = client.recv(1024)
                    if data:
                        print(f"Received: {data.decode()} | Client: {addr}")
                        client.sendall(data)
                    else:
                        # No more data, close the connection
                        clients.remove(client)
                        client.close()
                except BlockingIOError:
                    # No data available to read
                    pass
            time.sleep(0.1)  # Small delay to prevent CPU spinning
    finally:
        for client in clients:
            client.close()
        server_socket.close()


if __name__ == "__main__":
    HOST = "127.0.0.1"
    PORT = 65432
    echo_server(HOST, PORT)

接著,可以試試看執行多個 echo_client.py 試試,就會發現這個版本的 echo server 可以處理多個連線啦!

But! 又有新問題出現了!

人生有很多 but, 程式也不例外!

前述 non-blocking 版本的 echo server 存在幾個問題:

  1. 使用暴力 while 迴圈也太不優雅了吧!
  2. 如果有上千個客戶連線,每次都需要走訪上千個 client_socket 之後,才能再接受新連線。
  3. 假如1,000 個客戶同時傳送資料,第 1,000 位客戶需要等前面 999 位都被 echo server 回應之後才能收到回應。

總體而言,這個版本存在隨著客戶數量增多,效能會隨之下降的問題。

針對這 3 個問題,下文開始逐個擊破!

select 模組

針對第 1 個問題,其實 Python 文件已經給出答案

In non-blocking mode, operations fail (with an error that is unfortunately system-dependent) if they cannot be completed immediately: functions from the selectmodule can be used to know when and whether a socket is available for reading or writing.

我們可以使用 select 模組, select 模組可以讓我們知道某個 socket 是否處於可以讀取或者可以寫入的狀態!

p.s. Python 官方文件推薦使用 selectors 模組,本文因教學緣故,故選擇使用 select 模組進行

p.s. 也可參考 Python 模組教學 - selectors 一文學習

select 模組主要提供 5 個函式供我們使用:

  1. select()
  2. poll()
  3. epoll() 僅核心(kernel)版本 2.5.44 以上 Linux 作業系統支援
  4. devpoll() 僅限 Solaris 作業系統
  5. kqueue() 多數(most) BSD 系列作業系統有支援

本文僅專注在 select() , poll()epoll() 這 3 個方法,原因是 select() 是大多數作業系統都有支援的 system calls, 而 poll() 被大多數 Unix 系統所支援, epoll() 則是 Linux 核心版本 2.5.44 以上支援的高效能 system call 。

select() , poll() , epoll() 等方法都是為了提供 1 種稱為 I/O multiplexing 的功能。

什麼是 I/O multiplexing (I/O 多工 / 多路複用)?

I/O multiplexing, 又稱 I/O 多工或多路複用(這名字相當不友善啊⋯⋯)。

主要解決的問題是避免使用 while 迴圈不斷輪詢 socket object 來檢查是否可以進行讀取或寫入操作。這種方法將相關的任務交給作業系統來處理,從而使我們的程式碼更為簡潔並提升效能。

select() , poll()epoll() 是 I/O multiplexing 的 3 種主要 system calls, 它們的發展順序分別是 select() 最早,其次是 poll(),最後則是 epoll()

簡單來說, poll() 改善了 select() 的缺點, epoll() 又改善了 poll() 的缺陷,是一個逐步演進的過程。

在此先不介紹 3 種方法的差異,先各自以 3 種方法實作 echo server 看看,先求有個感覺。

select.select() 簡介

總之,先認識一下最早發展的 select() 吧!

Python select 模組的 select() 對應的就是 select() 這個 system call, 其使用參數如下:

select.select(rlist, wlist, xlist[, timeout])

可以看到 select() 方法至少需要 rlist, wlist, xlist 3 個參數,這 3 個參數都必須是 iterables of “waitable objects”

waitable object 必須是代表 file descriptor(fd) 的整數,或者是具有 fileno() 方法的物件, fileno() 方法也必須回傳代表 file descriptor 的整數。

rlist, wlist, xlist 3 個參數分別代表:

  1. rlist: wait until ready for reading,
  2. wlist: wait until ready for writing
  3. xlist: wait for an exceptional condition

也就是說,需要請作業系統幫我們探知哪些 file descriptors 處於可以讀取的狀態的話,就是放到 rlist 這個參數;請作業系統幫我們探知哪些 file descriptors 處於可以寫入的狀態的話,就放到 wlist 這個參數;請作業系統幫我們探知哪些 file descriptors 處於例外或發生錯誤的情況的話,就放到 xlist 這個參數。

以 echo sever 的用途來看,我們只需要帶入正確的 rlistwlist 即可, xlist 則可以帶入 [] 空 list 。

select() 會回傳 1 個長度為 3 的 tuple, 分別代表rlist, wlistxlist 中哪些處於 ready 的狀態:

The return value is a triple of lists of objects that are ready: subsets of the first three arguments.

以 echo server 的用途來看,我們只需要前 2 個回傳值即可,也就是讀取跟寫入。

所以 select.select() 使用方法會類似:

while True:
    readables, writables, _ = select.select(rlist, wlist, [])

用 select() 進一步改善 echo server

為了對 I/O multiplexing 有更進一步的認識,我們先把 echo server 用 select() 改寫,感受 select() 在做的事,以及改善後的樣子是否比起使用 while 迴圈暴力輪詢的方法來的優雅。

select_echo_server.py

import select
import socket
from collections import defaultdict, deque


def echo_server(host, port):
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server_socket.bind((host, port))
    server_socket.listen(5)
    server_socket.setblocking(False)
    print(f"Echo server is running on {host}:{port}")

    echo_queue = defaultdict(deque)
    # Lists of sockets for select
    rlist = [server_socket]
    wlist = []
    try:
        while True:
            # Wait for at least one of the sockets to be ready for processing
            readable, writable, _ = select.select(rlist, wlist, [])

            for s in readable:
                if s is server_socket:
                    client_socket, addr = server_socket.accept()
                    print(f'Connected by {addr}')
                    client_socket.setblocking(False)
                    rlist.append(client_socket)
                    wlist.append(client_socket)
                else:
                    # Handle incoming data on a connected socket
                    if data := s.recv(1024):
                        print(f"Received: {data.decode()}")
                        if s in writable: # Read for write
                            s.sendall(data)
                        else:
                            echo_queue[s].append(data)
                    else:
                        # Interpret empty result as closed connection
                        wlist.remove(s)
                        rlist.remove(s)
                        if s in echo_queue:
                            echo_queue.pop(s)
                        s.close()

            for s in writable:
                if s in wlist and len(echo_queue[s]) > 0:
                    data = echo_queue[s].popleft()
                    s.sendall(data)
    finally:
        server_socket.close()


if __name__ == "__main__":
    HOST = "127.0.0.1"
    PORT = 65432
    echo_server(HOST, PORT)

重點說明一下上述使用 select() 的 echo server 。

首先,我們把 server_socket 放到 rlist ,這是因為 server 接受新連線這個行為屬於讀取, wlist 一開始則設定為 [] 代表目前沒有客戶連線,自然也不需要寫入資料到客戶的 socket object 。

p.s. sever_socket 屬於 waitable object, 因為它具有 fileno() 方法,呼叫該方法可以取的 file descriptor

接著 while 迴圈一開始就使用 readable, writable, _ = select.select(rlist, wlist, []) 取得哪些 waitable objects 處於可以讀取與可以寫入的狀態,再來就是用 2 個 for 迴圈分別對每個 waitable object 做讀取與寫入的操作。

處理走訪 readable 的部分:

  • 如果 waitable object 是 server_socket 就代表有新的連線要求,所以呼叫 accept() 方法之後,把 client_socket 分別加到 rlistwlist , rlist 是因為需要讀取客戶端傳送什麼資料, wlist 則是因為要將資料送回去給客戶端,如此一來,下次執行 select.select(...) 時,作業系統就會幫我們查看客戶的 socket object 是否可以讀取或寫入。
  • 如果 waitable object 不是 server_socket , 就代表它是客戶的 socket object, 所以我們可以呼叫 recv(1024) 方法讀取資料,如果此時客戶的 socket object 處於可以寫入的狀態,就可以先送回資料,如果不能寫入,就先把收到的資料存到 echo_queue 之中,在之後處理 writablefor 迴圈再處理,換句話說, readable 狀態不代表它也會是 writeable, 這也是為什麼額外需要 echo_queue 處理還不能寫入的情況。

處理走訪 writable 的部分:

  • 檢查每個處於可以寫入狀態的客戶 socket object 是否有資料要寫入,有的話就呼叫 sendall() 把資料傳給客戶

目前為止,可以明顯看到用 select.select() 實作的 echo server 改善 2 件事:

  1. 不再需要用 while 無窮迴圈暴力輪詢 server socket 與 client sockets 是否處於可以讀取或寫入的狀態。
  2. 接收資料時不需要用 for 迴圈走訪所有 client sockets 詢問是否有資料可以讀取, select.select() 會在有資料可以讀取時才通知我們。

select.poll() 簡介

學會 select.select() 之後,緊接著認識一下 select.poll()

select.poll()

(Not supported by all operating systems.) Returns a polling object, which supports registering and unregistering file descriptors, and then polling them for I/O events; see section Polling Objects below for the methods supported by polling objects.

呼叫 select.poll() 時會回傳 1 個 Polling Object , 我們可以跟這個 polling object 註冊(register)需要關注哪些 file descriptor 的哪些狀態。當然,也可以取消註冊。

所以 select.poll() 使用方法會類似:

polling = select.poll()
polling.register(fd[, eventmask])

上述參數 eventmask 是我們想關注的狀態,例如可以讀取、可以寫入等等,所有 events 可以閱讀 poll.register() 方法的說明,如果不指定的話,預設會關注的 events 有 3 種:

  1. POLLIN , 代表有資料可以讀取。
  2. POLLPRI , 代表有 urgent data 可以讀取, urgent data 是 TCP 網路協定裡的 1 種機制,可以為資料標上 urgent, 代表這份資料比起其他資料更緊急,不過要如何處理 urgent data 還是接收端的事,如果接收端沒有對 urgent data 做特別處理,即使是 urgent data 也沒任何效果。
  3. POLLOUT , 代表可以寫入資料。

以 echo server 的用途來看,我們只需要關注 POLLINPOLLOUT 即可,也就是讀取跟寫入。

值得注意的是,這些 events 都是 bitmask, 如果想同時關注多個 events 的話,可以使用 bitwise OR 運算子 | 結合起來。

以下是同時關注 POLLINPOLLOUT event 的範例:

polling.register(fd, POLLIN | POLLOUT)

如果要接收來自作業系統的通知,則是呼叫 Polling Object 的 poll() 方法,該方法會回傳 1 個 list, list 裡的每個元素都是長度為 2 的 tuple, 第 1 個元素為 file descriptor (整數), 第 2 個則是 event:

[
   (fd1, event),
   (fd2, event),
   ...
]

所以我們只要使用 for 迴圈走訪 select.poll() 所回傳的 list, 就可以知道哪些 file descriptor 各自處於什麼狀態。

完整的 select.poll() 的使用樣貌會類似:

polling = select.poll()
polling.register(fd[, eventmask])
while True:
    for fd, event in polling.poll():
        ...

用 poll() 實作 echo server

同樣用 select.poll() 實作 echo server 看看。

以下是使用 select.poll() 實作 echo server 的程式碼:

poll_echo_server.py

import select
import socket
from collections import defaultdict, deque


def echo_server(host, port):
    # Create a server socket
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server_socket.bind((host, port))
    server_socket.listen(3)
    server_socket.setblocking(False)

    print(f"Echo server is running on {host}:{port}")
    # Create a polling object
    polling = select.poll()
    # Register the server socket to monitor for "read" events
    polling.register(server_socket, select.POLLIN)

    # Map file descriptors to socket objects
    fd_to_socket = {server_socket.fileno(): server_socket}
    echo_queue = defaultdict(deque)
    try:
        while True:
            for fd, event in polling.poll():
                s = fd_to_socket[fd]
                if event & select.POLLIN:  # Ready to read
                    if s is server_socket:
                        client_socket, addr = s.accept()
                        print(f"Connected by {addr}")
                        client_socket.setblocking(False)
                        polling.register(client_socket, select.POLLIN | select.POLLOUT)
                        fd_to_socket[client_socket.fileno()] = client_socket
                    else:
                        # Read data
                        if data := s.recv(1024):
                            print(f"Received: {data.decode()}")
                            if event & select.POLLOUT: # Ready to write
                                 s.sendall(data)
                            else:
                                 echo_queue[s].append(data)
                        else:
                            polling.unregister(s)
                            fd_to_socket.pop(s.fileno())
                            s.close()
                elif event & select.POLLOUT: # Ready to write
                    if len(echo_queue[s]) > 0:
                        data = echo_queue[s].popleft()
                        s.sendall(data)
    finally:
        # Unregister and close the server socket
        polling.unregister(server_socket)
        server_socket.close()


if __name__ == "__main__":
    HOST = "127.0.0.1"
    PORT = 65432
    echo_server(HOST, PORT)

上述程式碼能觀察到使用 select.poll() 實作 echo server 的程式碼邏輯大致相同。

不過,還是有幾點不一樣:

  • 已經不需要用 2 個 for 迴圈分別處理 readablewritable ,原因在於這些狀態被整合在 bitmask 的變數 event 內,我們可以用 & 運算子判斷屬於何種 event 。
  • select.poll() 回傳的是 file descriptor, 不像 select.select() 是 socket object, 所以我們需要額外的 dictionary 儲存 file descriptor 與 socket object 的對應。
  • 我們不需要像使用 select.select() 那樣需要維護 rlist , wlist 等變數,可以直接呼叫 polling object 的 register() 方法告訴 polling object 哪些 file descriptor 需要關注哪些 events 就好。

整體而言,比起使用 select.select() ,使用 select.poll() 的程式碼來得更簡潔一些。

select.epoll() 簡介

最後看看 select.epoll()

select.epoll() 是 3 個方法中效能最好的,不過僅有核心版本 2.5.45 (含 2.5.45) 以上的 Linux 作業系統支援。

p.s. 查看核心版本的指令為 uname -r

基本上, Python 的 select.epoll() 的使用方式與 select.poll() 相似:

epoll = select.epoll()
epoll.register(fd[, eventmask])
while True:
    for fd, event in epoll.poll():
        ...

p.s. 雖然使用方法相似,但其實 epoll() 底層實作共有 3 個 API, 只是 Python 的 select 模組包裝得很好才與 select.poll() 相似

不過 select.epoll() 回傳的是 edge polling object , 所以它多了幾個 polling object 所沒有的方法,例如 fileno() , fromfd()

p.s. 看到 select.epoll()flieno() 方法,就代表它本身其實也是 1 個 file descriptor / waitable object

另外, select.epoll() 的 eventmask 也長得不一樣,甚至多好多個 select.poll() 所沒有的 eventmasks, 完整的 eventmasks 請閱讀官方文件select.poll() 所定義的 3 個預設 eventmasks, 在 select.epoll() 也有相對應的 eventmasks:

  1. EPOLLIN , 代表有資料可以讀取。
  2. EPOLLPRI , 代表有 urgent data 可以讀取, urgent data 是 TCP 網路協定裡的 1 種機制,可以為資料標上 urgent, 代表這份資料比起其他資料更緊急,不過要如何處理 urgent data 還是接收端的事,如果接收端沒有對 urgent data 做特別處理,即使是 urgent data 也沒任何效果。
  3. EPOLLOUT , 代表可以寫入資料。

其實就是加了 E 在最開頭而已。

除了上述提到的 3 種 eventmasks 之外,還有個重要的 eventmask 需要認識,那就是 EPOLLET

EPOLLET 的原文說明是:

Set Edge Trigger behavior, the default is Level Trigger behavior

相信多數人看到這句話就會冒出一大堆問號 — Edge Trigger behavior? Level Trigger behavior?

白話文解釋 Edge Triggered 與 Level Triggered

原來 epoll() system call 支援 2 種不同的 event 通知方式,分別是:

  • Edge Triggered
  • Level Triggered

這 2 者的通知方式,在沒有設定的情況下,預設會使用 Level Triggered

那 edge triggered 與 level triggered 到底什麼意思?

Level triggered 只要滿足發送 event 的條件, select.epoll() 就會一直通知有 event, 儘管這個 event 之前已經發送過。

Edge triggered 則是當滿足發送 event 的條件時,就僅發送 1 次 event ,之後就不會針對相同的 event 發出通知。

下列範例 Sever 程式碼可以幫助我們了解 level triggered 與 edge triggered 的區別,以下程式碼只會對變數 server_socket 監聽是否處於有資料能讀取的狀態,一旦知道有連線進來,也不會做任何處理,只會列印 You received an EPOLLIN event 字串,由於我們沒有設定 EPOLLET eventmask, 所以 epoll() 預設會使用 level triggered 通知我們:

import socket
import select

server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(('127.0.0.1', 65432))
server_socket.listen(3)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.setblocking(False)
print(f"Server is running on 127.0.0.1:65432")
epoll = select.epoll()
epoll.register(server_socket, select.EPOLLIN)
while True:
    for _, event in epoll.poll():
        if event & select.EPOLLIN:
            # don't accept any connections
            print('You received an EPOLLIN event')

當我們用 echo_client.py 對上述 server 送出任意字串之後,就會發現 server 瘋狂印出 You received an EPOLLIN event , 因為 server_socket 一直處於有資料能讀取的狀態。這就是 level triggered 的作用,只要 file descriptor 狀態仍滿足條件,它就會持續通知我們。

接著體驗 edge triggered 一下。

我們將 epoll.register(server_socket, select.EPOLLIN) 改成 epoll.register(server_socket, select.EPOLLIN | select.EPOLLET) 就能夠使用 edge triggered:

import socket
import select

server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(('127.0.0.1', 65432))
server_socket.listen(3)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.setblocking(False)
print(f"Server is running on 127.0.0.1:65432")
epoll = select.epoll()
epoll.register(server_socket, select.EPOLLIN | select.EPOLLET)
while True:
    for _, event in epoll.poll():
        if event & select.EPOLLIN:
            # don't accept any connections
            print('You received an EPOLLIN event')

當我們用 echo_client.py 對上述 server 送出任意字串之後,就會發現 server 只印出 1 次 You received an EPOLLIN event 。這就是 edge triggered 的作用,即使 file descriptor 狀態仍滿足條件,它也只會通知我們一次。

這就是使用 epoll() 必須注意的 edge triggered 與 level triggered 。

用 select.epoll() 實作 echo server

最後,看一下使用 select.epoll() 實作的 echo server 程式碼:

epoll_echo_server.py

import select
import socket
from collections import defaultdict, deque


def epoll_echo_server(host, port):
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server_socket.bind((host, port))
    server_socket.listen(3)
    server_socket.setblocking(False)
    print(f"Echo Server is running on {host}:{port}")

    epoll = select.epoll()
    epoll.register(server_socket.fileno(), select.EPOLLIN)
    fd_to_socket = {server_socket.fileno(): server_socket}
    echo_queue = defaultdict(deque)
    try:
        while True:
            for fd, event in epoll.poll():
                s = fd_to_socket[fd]
                if event & select.EPOLLIN:
                    if s is server_socket:
                        client_socket, addr = server_socket.accept()
                        print(f"Connected by {addr}")
                        client_socket.setblocking(False)
                        epoll.register(client_socket, select.EPOLLIN | select.EPOLLOUT)
                        fd_to_socket[client_socket.fileno()] = client_socket
                    else:
                        # Ready for reading
                        if data := s.recv(1024):
                            if event & select.EPOLLOUT:
                                s.sendall(data)
                            else:
                                echo_queue[s].append(data)
                        else:
                            epoll.unregister(fd)
                            s.close()
                elif event & select.EPOLLOUT:
                    if s is not server_socket and len(echo_queue[s]) > 0:
                        data = echo_queue[s].popleft()
                        s.sendall(data)
    finally:
        epoll.unregister(server_socket)
        epoll.close()
        server_socket.close()


if __name__ == "__main__":
    HOST = "127.0.0.1"
    PORT = 65432
    epoll_echo_server(HOST, PORT)

上述程式碼幾乎與使用 select.poll() 的 echo server 範例相同,就不多做解釋了。

select() vs. poll() vs. epoll()

至此,我們已經學會如何使用 select() , poll()epoll() 3 種不同方法實作具備 I/O multiplexing 功能的伺服器。

雖然 select() , poll()epoll() 是逐步演進的過程,底層各自有不同的實作方式(詳見 select v.s. poll v.s. epoll 1 文),不過 3 種方法也有各自的優缺點。

select() 的優缺點

優點:

  • 幾乎所有的作業系統都支援 select() system call, 如果想要讓應用程式具備跨平台能力執行的能力,又不想花時間處理不同平台要呼叫什麼 system call 的話,可以考慮使用 select() 就好。

缺點:

  • 就如同範例所示,使用 select() 需要自行維護 rlist, wlistxlist 3 個參數,不像使用 poll()epoll() 呼叫 register()unregister() 等方法,就能夠請作業系統幫忙輪詢 file descriptor 狀態。
  • 因為底層實作方式的緣故,效能比 poll()epoll() 差。主要是 select() 輪詢的方式造成此問題,例如我們想關注 3 個 file descriptors, 其數字為 {1, 3, 1023} 的話, select() 的輪詢方式會從 0 開始問到 1023+1, 這個輪詢的最大值由 {1, 3, 1023} 的最大值所決定,如果我們想關注的 file descriptor 剛好有 1 個數字很大,那勢必會造成執行速度比較慢。時間複雜度屬於 O(n) 。
  • 承上所述, select() 輪詢的最大值有限制,通常是 1024 ,如果你的 file descriptor 數字大於 1023, 就只能選擇使用 poll()epoll()

poll() 的優缺點

優點:

  • macOS, Linux 與 Unix 都有支援 poll() system call ,仍可跨多數作業系統。
  • 效能比起 select() 來得更好,因為不是採用 select() 的輪詢方式。

缺點:

  • Windows 不支援 poll() system call 。
  • poll() 只支援 level triggered, 不像 epoll() 還有額外支援 edge triggered 。
  • 雖然不是採用 select() 的輪詢方式,但是在輪詢方式的時間複雜度仍屬於 O(n), 也就是說假設監控 100 個 file descriptor 的話, poll() 也是需要 100 個 file descriptor 都問過一遍才能知道狀態,因此效能也是隨著連線數增加而下降。時間複雜度仍屬於 O(n) 。

epoll() 的優缺點

優點:

  • 高效能!與 poll() 的輪詢方式不同, epoll() 採用 callback 的方式得知 file descriptor 的狀態,例如 socket 收到資料之後,會呼叫 epoll() 的 callback, 藉此告訴 epoll() 有 socket 處於可以讀取的狀態,所以不需要 poll() 那樣輪詢。時間複雜度屬於 O(1) ,能夠處理大量連線, Nginx, Redis 都有使用 epoll() 改善效能。

缺點:

  • 僅有核心版本 2.5.45 (含 2.5.45) 以上的 Linux 作業系統支援 epoll()

總結

學習網路程式設計概念最好的辦法,是實際從 1 個簡單的 server 開始,直面它所遭遇的問題,然後逐步解決!

在這篇文章中,我們至少認識了單執行緒、阻塞式、非阻塞式、I/O 多工等概念,而且你會發現即使我們沒有使用任何多執行緒、多 processes 技術,單單 1 個 Python process (single thread)也能夠做到處理多個連線。

阻塞式、非阻塞式、I/O 多工都是網路程式設計相當重要的概念,值得花些時間認識,未來在征服星辰大海的時候,你一定會發現,對這些基礎概念的深刻理解,是你最堅實的靠山!

以上!

Enjoy!

References

socket — Low-level networking interface

select — Waiting for I/O completion

Linux – IO Multiplexing – Select vs Poll vs Epoll – Developers Area

select v.s. poll v.s. epoll

對抗久坐職業傷害

研究指出每天增加 2 小時坐著的時間,會增加大腸癌、心臟疾病、肺癌的風險,也造成肩頸、腰背疼痛等常見問題。

然而對抗這些問題,卻只需要工作時定期休息跟伸展身體即可!

你想輕鬆改變現狀嗎?試試看我們的 PomodoRoll 番茄鐘吧! PomodoRoll 番茄鐘會根據你所設定的專注時間,定期建議你 1 項辦公族適用的伸展運動,幫助你打敗久坐所帶來的傷害!

贊助我們的創作

看完這篇文章了嗎? 休息一下,喝杯咖啡吧!

如果你覺得 MyApollo 有讓你獲得實用的資訊,希望能看到更多的技術分享,邀請你贊助我們一杯咖啡,讓我們有更多的動力與精力繼續提供高品質的文章,感謝你的支持!