你知道 asyncio 的 event loop 是怎麼 loop 的嗎?談 event loop 如何處理 I/O 工作
Posted on Jul 11, 2024 in Python 程式設計 - 高階 by Amo Chen ‐ 8 min read
在「你知道 asyncio 的 event loop 是怎麼 loop 的嗎?談 event loop 的排程與執行」一文中,我們談了 asyncio 是如何進行工作排程與執行工作,不過沒有提到它如何處理 I/O 工作的細節(例如網路傳輸)。
本文將從知名 ASGI Web Server 開源專案 uvicorn 開始,搭配 Python asyncio 官方文件與簡單的範例,一步步介紹 event loop 是如何處理 I/O 工作。
前言
本文是「你知道 asyncio 的 event loop 是怎麼 loop 的嗎?談 event loop 的排程與執行」的延伸之作,專注於講解 asyncio event loop 的運作與細節,對 asyncio event loop 不熟悉的讀者,強烈建議請先閱讀該文,建立對 asyncio event loop 的正確認知,以對本文有更好的理解與掌握能力。
從知名的開源專案 uvicorn 出發
絕大多數情況下,使用 asyncio 是不需要特意去理解它底層如何做 I/O 的工作,因為 Python 已經將 event loop 底層打理得相當不錯,站在巨人肩膀上,我們可以更專注在程式的核心開發工作上。
不過深入理解 asyncio 的 event loop 仍能給我們帶來一些好處,除了能提高對 asyncio 的掌握能力之外,也能夠在有需要時知道如何改寫底層(low-level)功能,實作更高效能的 asyncio 應用,例如知名的 ASGI Web Sever 專案 uvicorn, 即是透過實作 asyncio 的 Protocol 讓 ASGI Web Server 能夠支援 HTTP/1.1 與 WebSocket 協定。
我們可以在 uvicorn 的原始碼看到 HTTP, WebSocket 這 2 個 Protocols:
雖然我們都知道 Protocol 是協定的意思,不過 asyncio 的 event loop 其實也有抽象化(abstraction)的 2 個 APIs, 分別是:
- Transports
- Protocols
如果我們去翻閱官方文件,就會發現 acyncio 的 event loop 就是藉由這 2 個 APIs 處理 I/O 的工作,而且 Python 也是藉由這 2 個 APIs 開放大家實作底層 I/O 功能的彈性。
所以我們如果再進一步仔細看 uvicorn 的 Protocols 的原始碼的話,也能夠發現確實是實作 asyncio.Protocol 以處理網路 I/O 相關的工作:
但我們不需要在此深究 uvicorn 的實作細節,只要知道 Transports 與 Protocols 在 event loop 中的角色即可。
Transports 與 Protocols
asyncio event loop 關於底層 I/O 事件的處理,分為 2 個介面(interfaces):
- Transports
- Protocols
關於這 2 個介面的區別, Python 用了多個說明試圖解釋它們之間的差異。
但我覺得以下這段比較清楚解釋兩者的作用:
A different way of saying the same thing: a transport is an abstraction for a socket (or similar I/O endpoint) while a protocol is an abstraction for an application, from the transport’s point of view.
Transport 負責的是 socket 或者 I/O 相關的工作,例如傳送資料;而 Protocol 負責的是 application 層面的邏輯,譬如何時呼叫 transport 傳送資料、接收到 transport 所傳來的資料之後如何處置等等。
There is always a 1:1 relationship between transport and protocol objects: the protocol calls transport methods to send data, while the transport calls protocol methods to pass it data that has been received.
而 Transport 與 Protocol 是 1 對 1 的關係。
Protocol 有資料要傳送時,會呼叫 transport 的相關方法以傳送資料;而 transport 接收到資料之後,也會呼叫 protocol 的相關方法,讓 protocol 處理接收到的資料。
Protocols
先談談 Protocols 。
asyncio.Protocol (該類別也繼承 BaseProtocol
類別)定義多種方法,這些方法都是 callbacks, 是特定事件發生後,就會被 transport 所呼叫,以下舉 3 個方法為例:
BaseProtocol.connection_made(transport)
transport 會在連線建立之後,呼叫此方法,藉此通知 protocol 有新連線。
transport 會在收到資料之後,呼叫此方法,藉此傳遞資料給 protocol 。
transport 會在 buffer 到達高水位(hight watermark)時,呼叫此方法,藉此告知 Protocol 暫停寫入資料的行為。
我們還可以在官方文件看到關於狀態機(state machine)的描述,用於描述這些方法被呼叫的流程,例如其中一種 Streaming Protocols 的狀態機為:
start -> connection_made
[-> data_received]*
[-> eof_received]?
-> connection_lost -> end
p.s. *
代表可能會被呼叫多次, ?
代表會被呼叫 0 - 1 次
目前 Python 共提供 3 種 Protocols:
- Streaming Protocols
- Buffered Streaming Protocols
- Datagram Protocols
其中 asyncio 的低階函式 loop.create_server(), loop.create_unix_server() 與 loop.create_connection() 等,都是回傳實作 Streaming Protocols 的物件。
體驗 Protocols 的實作
我們可以先用 1 個簡單的 HTTP client 體驗一下 Protocol 在處理 application 層面的運作邏輯這件事。
以下是 1 個實作 asyncio.Protocol
的簡單 HTTP client, 該範例會對 http://127.0.0.1:65432
發出 HTTP GET 要求,並列印 response 的內容:
http_client_protocol_demo.py
import asyncio
class HTTPClientProtocol(asyncio.Protocol):
def __init__(self, on_con_lost):
self.transport = None
self.response = ""
self.on_con_lost = on_con_lost
def connection_made(self, transport):
self.transport = transport
request = (
"GET / HTTP/1.1\r\n"
"Connection: close\r\n"
"\r\n"
)
self.transport.write(request.encode())
def data_received(self, data):
self.response += data.decode()
def connection_lost(self, exc):
print("Response received:")
print(self.response)
self.on_con_lost.set_result(True)
async def main():
loop = asyncio.get_running_loop()
on_con_lost = loop.create_future()
_, protocol = await loop.create_connection(
lambda: HTTPClientProtocol(on_con_lost),
'127.0.0.1', 65432
)
await on_con_lost
loop.stop()
if __name__ == "__main__":
asyncio.run(main())
執行上述程式碼之前,請先執行以下指令建立 1 個 Python 文件伺服器,如此一來我們就不需要再開發測試用的 HTTP 伺服器:
$ python -m pydoc -n 127.0.0.1 -p 65432
程式碼執行指令如下:
$ python http_client_protocol_demo.py
成功之後將可以看到類似以下的輸出,是 HTTP response 的內容:
Response received:
HTTP/1.0 200 OK
Server: BaseHTTP/0.6 Python/3.11.4
Date: Wed, 10 Jul 2024 08:26:50 GMT
Content-Type: text/html; charset=UTF-8
<!DOCTYPE>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Pydoc: Index of Modules</title>
...
以下重點說明程式碼。
首先看 coroutine main()
的部分,該 coroutine 透過 event loop 建立 1 個 Future object on_con_lost
, 後續會用以確認連線是否關閉(這也是 asyncio 常用技巧,用於確認某項工作是否正常運作, Python 官方文件也有相同用法)。
loop = asyncio.get_running_loop()
on_con_lost = loop.create_future()
接著,同樣透過 event loop 對 127.0.0.1:65432
建立 1 個連線,並使用我們所定義的 HTTPClientProtocol
類別處理連線的邏輯:
_, protocol = await loop.create_connection(
lambda: HTTPClientProtocol(on_con_lost),
'127.0.0.1',
65432
)
p.s. loop.create_connection()
第 1 個參數稱為 protocol_factory
,只要在 asyncio 文件看到它,就必須知道它是 1 個 callable 且會回傳有實作 protocol 的物件
最後在連線結束後,停止 event loop 的運作,結束程式。
await on_con_lost
loop.stop()
回過頭來再看看 HTTPClientProtocol
類別,這個類別繼承 asyncio.Protocol
並會在 __init__(self, on_con_lost)
方法初始化 3 個屬性:
self.transport
用以在
connection_made(transport)
方法內綁定 transportself.on_con_lost
用以綁定我們從 coroutine
main()
傳進來的 Future objectself.response
用以在
data_received(data)
方法內儲存 transport 收到的資料
class HTTPClientProtocol(asyncio.Protocol):
def __init__(self, on_con_lost):
self.transport = None
self.response = ""
self.on_con_lost = on_con_lost
剩下的部分是實作 connection_made()
, data_received()
以及 connection_lost()
這些 asyncio.Protocol
類別所規定的方法。
當連線建立成功之後, transport 會呼叫 protocol 的 connection_made
方法,並把 transport 自己當作參數傳 connection_made
方法,所以我們可以在此處綁定 self.transport
屬性,並使用 self.transport
對 127.0.0.1:65432
發出 HTTP GET request:
def connection_made(self, transport):
self.transport = transport
request = (
"GET / HTTP/1.1\r\n"
"Connection: close\r\n"
"\r\n"
)
self.transport.write(request.encode())
當 transport 收到來自 127.0.0.1:65432
的回應資料時,就會呼叫 protocol 的 data_received
方法,並把收到的資料當作參數傳入,所以我們可以在此處組合完整的回應資料:
def data_received(self, data):
self.response += data.decode()
最後,當連線關閉時, transport 又會呼叫 protocol 的 connection_lost
方法,我們在此處列印完整的回應資料,並且在此處對 protocol 的 on_con_lost
屬性設定 result, 如此一來 coroutine main()
中的 await on_con_lost
就會停止等待狀態,進而執行下一行停止 event loop 的程式碼:
def connection_lost(self, exc):
print("Response received:")
print(self.response)
self.on_con_lost.set_result(True)
至此,相信大家對於 Protocol 的運作已經有初步的概念。整體而言, Protocol 的實作頗有 socketserver 模組 的感覺,開發者只要專注在如何處理特定事件即可(例如新連線、新資料、連線關閉等事件)。
p.s. Python 也提供 asyncio 高階函式實作 client, server 的功能,使用上更簡單,詳見 Streams
有了初步概念之後,其實也能夠知道如何透過實作 protocol 做出簡單的 HTTP 伺服器:
http_protocol_server.py
import asyncio
class HTTPServerProtocol(asyncio.Protocol):
def __init__(self):
self.transport = None
def connection_made(self, transport):
peername = transport.get_extra_info('peername')
print('Connection from {}'.format(peername))
self.transport = transport
def data_received(self, data):
self.request = data.decode()
print('Data received: {!r}'.format(self.request))
self.transport.write((
b"HTTP/1.1 200 OK\r\n"
b"Content-Type: text/html\r\n"
b"\r\n"
b"<html><head><title>Protocol</title></head><body>"
b"<h1>Hello World</h1>"
b"</body></html>"
))
self.transport.close()
async def main():
loop = asyncio.get_running_loop()
server = await loop.create_server(
lambda: HTTPServerProtocol(),
'127.0.0.1', 65432
)
async with server:
await server.serve_forever()
if __name__ == "__main__":
asyncio.run(main())
上述程式碼也是遵循實作 asyncio.Protocol
的方法,實作相對應的事件處理方法而已,並且使用 asyncio 低階函式 loop.create_server() 建立 1 個伺服器,此處不再多做贅述。
p.s. loop.create_server()
第 1 個參數也是 protocol_factory
上述程式碼執行指令如下:
$ python http_protocol_server.py
執行之後,使用瀏覽器輸入網址 http://127.0.0.1:65432
將可以看到以下畫面:
Transport 從何而來?
目前為止,我們並不需要特意實作 Transport, 這是因為預設我們所使用的 asyncio event loop 會自動提供 1 個已經實作好的 transport, 我們可以從 asyncio 低階函式 loop.create_connection()
的原始碼中找到以下 2 個片段:
transport, protocol = await self._create_connection_transport(
sock, protocol_factory, ssl, server_hostname,
ssl_handshake_timeout=ssl_handshake_timeout,
ssl_shutdown_timeout=ssl_shutdown_timeout)
與
async def _create_connection_transport(
self, sock, protocol_factory, ssl,
server_hostname, server_side=False,
ssl_handshake_timeout=None,
ssl_shutdown_timeout=None):
sock.setblocking(False)
protocol = protocol_factory()
waiter = self.create_future()
if ssl:
sslcontext = None if isinstance(ssl, bool) else ssl
transport = self._make_ssl_transport(
sock, protocol, sslcontext, waiter,
server_side=server_side, server_hostname=server_hostname,
ssl_handshake_timeout=ssl_handshake_timeout,
ssl_shutdown_timeout=ssl_shutdown_timeout)
else:
transport = self._make_socket_transport(sock, protocol, waiter)
try:
await waiter
except:
transport.close()
raise
return transport, protocol
上述 2 片段屬於 BaseEventLoop
類別,而繼承 BaseEventLoop
的 BaseSelectorEventLoop 類別與 BaseProactorEventLoop 類別,都各自實作 _make_ssl_transport
與 _make_socket_transport
方法,藉此提供 transport 給 protocol 使用以及將 protocol 物件傳遞給 transport 使用。
p.s. BaseProactorEventLoop 是 Windows 在用的 event loop, 詳見 Event Loop Implementations ,本文僅會探討 Linux, Unix 所使用的 BaseSelectorEventLoop
Transports
如前文所述,我們實際上不需要特意開發 transport, 但我們還是可以閱讀 Python asyncio 模組中的 transports.py
的原始碼看看 Transport 做了哪些事。
首先 transports 分為 5 種:
- Read only transports
- Write only transports
- Transports (read & write)
- Datagram transport (給 UDP 使用的 transport )
- Subprocess transport (給 subprocess 使用的 transport, 是 asyncio 做 Inter-Process Communication 的 transport, 因為 IPC 屬於 1 種 I/O 行為)
這 5 種都繼承自 BaseTransport 類別,從該類別可以看到多個 transport 的基本方法定義,包含:
set_protocol()
設定 protocol 物件get_protocol()
取得 protocol 物件close()
關閉 transport
而 ReadTransport 類別與 WriteTransport 類別又繼承 BaseTransport 類別,並各自定義讀、寫相關方法,包含:
is_reading()
是否正在接收資料pause_reading()
暫停接收資料resume_reading()
恢復接收資料set_write_buffer_limits()
設定 buffer 大小write()
寫入資料abort()
立即關閉 transport
這些就是 transport 在做的事,可以看到它專注在傳送/接收資料的細節上,明顯與 protocol 專注在如何使用/回應資料的行為不同。
詳細的實作範例可以參考 _SelectorTransport 與 _SelectorSocketTransport 這 2 個類別。
值得注意的是, _SelectorTransport 還繼承 transports._FlowControlMixin 類別,代表它有額外實作 flow control 功能,有就是說它具有依據緩衝區使用的情況,通知 protocol 暫停寫入、恢復寫入的能力:
class _SelectorTransport(transports._FlowControlMixin,
transports.Transport):
max_size = 256 * 1024 # Buffer size passed to recv().
這告訴我們一件事, flow control 的能力是由 transport 所負責提供的!
Event loop 是在哪裡使用 transport 與 protocol 的?
最後,我們來看看 event loop 是怎麼使用 transport 與 protocol 的。
這一切可以從 asyncio 低階函式 loop.create_server() 開始。
asyncio 的低階函式 loop.create_server() 最後會回傳 1 個 Server object (原始碼):
server = Server(self, sockets, protocol_factory,
ssl, backlog, ssl_handshake_timeout,
ssl_shutdown_timeout)
if start_serving:
server._start_serving()
# Skip one loop iteration so that all 'loop.add_reader'
# go through.
await tasks.sleep(0)
if self._debug:
logger.info("%r is serving", server)
return server
這個 Server object 會取得我們傳入的參數 protocol_factory
,然後在我們呼叫 Server object 的 serve_forever() 方法時, Server object 會再呼叫 _start_serving() 方法將 protocol_factory
傳遞給 event loop 的 _start_serving() 方法:
然後, event loop 就可以在它的 _start_serving() 方法裡,為 Server object 的 socket object 添加新連線的事件處理(原始碼):
def _start_serving(self, protocol_factory, sock,
sslcontext=None, server=None, backlog=100,
ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT):
self._add_reader(sock.fileno(), self._accept_connection,
protocol_factory, sock, sslcontext, server, backlog,
ssl_handshake_timeout, ssl_shutdown_timeout)
_add_reader() 原始碼:
def _add_reader(self, fd, callback, *args):
self._check_closed()
handle = events.Handle(callback, args, self, None)
key = self._selector.get_map().get(fd)
if key is None:
self._selector.register(fd, selectors.EVENT_READ,
(handle, None))
else:
mask, (reader, writer) = key.events, key.data
self._selector.modify(fd, mask | selectors.EVENT_READ,
(handle, writer))
if reader is not None:
reader.cancel()
return handle
可以看到 event loop 透過 self._selector.register(fd, selectors.EVENT_READ,(handle, None))
將接受新連線的 callback 註冊起來,當 Server 有新連線時,就會使用 event loop 的 _accept_connection() 方法接受新連線,而 event loop 的 _accept_connection() 也會呼叫 _accept_connection2() 建立 protocol 與 transport:
如果我們再進一步檢閱 _make_socket_transport() 方法的話,就會發現它是透過 _SelectorSocketTransport 類別將 connection_made()
, data_received()
等 protocol 的方法用 self._loop.call_soon()
的方式將工作放進 event loop 排程執行:
從這邊可以看出 1 個連線(connection)會使用 1 個獨立的 transport 與 protocol ,當有多個連線時, event loop 就會有多個 transports 與 protocols 要處理。
最後,我們看回「你知道 asyncio 的 event loop 是怎麼 loop 的嗎?談 event loop 的排程與執行」 1 文所提到的 _run_once()
程式碼:
你會發現上圖的紅框 2 有 2 行:
event_list = self._selector.select(timeout)
self._process_events(event_list)
是的,監聽 I/O 事件進而處理新連線就是在此處發生的!
所以每次 event loop 執行 _run_once()
時,都有很短的時間在處理 I/O 事件,並將這些事件透過前述的 transport 與 protocol 將工作放入排程。
這就是 event loop 如何處理 I/O 工作的介紹!
總結
雖然 asyncio 的 event loop 相當複雜,不過只要把握要點,你也能夠漸漸理解 event loop 的運作,進而提高 asyncio 程式設計的掌握能力!
以上!
Enjoy!