你知道 asyncio 的 event loop 是怎麼 loop 的嗎?談 event loop 如何處理 I/O 工作

Posted on  Jul 11, 2024  in  Python 程式設計 - 高階  by  Amo Chen  ‐ 8 min read

在「你知道 asyncio 的 event loop 是怎麼 loop 的嗎?談 event loop 的排程與執行」一文中,我們談了 asyncio 是如何進行工作排程與執行工作,不過沒有提到它如何處理 I/O 工作的細節(例如網路傳輸)。

本文將從知名 ASGI Web Server 開源專案 uvicorn 開始,搭配 Python asyncio 官方文件與簡單的範例,一步步介紹 event loop 是如何處理 I/O 工作。

前言

本文是「你知道 asyncio 的 event loop 是怎麼 loop 的嗎?談 event loop 的排程與執行」的延伸之作,專注於講解 asyncio event loop 的運作與細節,對 asyncio event loop 不熟悉的讀者,強烈建議請先閱讀該文,建立對 asyncio event loop 的正確認知,以對本文有更好的理解與掌握能力。

從知名的開源專案 uvicorn 出發

絕大多數情況下,使用 asyncio 是不需要特意去理解它底層如何做 I/O 的工作,因為 Python 已經將 event loop 底層打理得相當不錯,站在巨人肩膀上,我們可以更專注在程式的核心開發工作上。

不過深入理解 asyncio 的 event loop 仍能給我們帶來一些好處,除了能提高對 asyncio 的掌握能力之外,也能夠在有需要時知道如何改寫底層(low-level)功能,實作更高效能的 asyncio 應用,例如知名的 ASGI Web Sever 專案 uvicorn, 即是透過實作 asyncio 的 Protocol 讓 ASGI Web Server 能夠支援 HTTP/1.1 與 WebSocket 協定。

我們可以在 uvicorn 的原始碼看到 HTTP, WebSocket 這 2 個 Protocols:

uvicorn-protocols.png

雖然我們都知道 Protocol 是協定的意思,不過 asyncio 的 event loop 其實也有抽象化(abstraction)的 2 個 APIs, 分別是:

  • Transports
  • Protocols

如果我們去翻閱官方文件,就會發現 acyncio 的 event loop 就是藉由這 2 個 APIs 處理 I/O 的工作,而且 Python 也是藉由這 2 個 APIs 開放大家實作底層 I/O 功能的彈性。

所以我們如果再進一步仔細看 uvicorn 的 Protocols 的原始碼的話,也能夠發現確實是實作 asyncio.Protocol 以處理網路 I/O 相關的工作:

asyncio-protocol.png

但我們不需要在此深究 uvicorn 的實作細節,只要知道 Transports 與 Protocols 在 event loop 中的角色即可。

Transports 與 Protocols

asyncio event loop 關於底層 I/O 事件的處理,分為 2 個介面(interfaces):

  1. Transports
  2. Protocols

關於這 2 個介面的區別, Python 用了多個說明試圖解釋它們之間的差異。

但我覺得以下這段比較清楚解釋兩者的作用:

A different way of saying the same thing: a transport is an abstraction for a socket (or similar I/O endpoint) while a protocol is an abstraction for an application, from the transport’s point of view.

Transport 負責的是 socket 或者 I/O 相關的工作,例如傳送資料;而 Protocol 負責的是 application 層面的邏輯,譬如何時呼叫 transport 傳送資料、接收到 transport 所傳來的資料之後如何處置等等。

There is always a 1:1 relationship between transport and protocol objects: the protocol calls transport methods to send data, while the transport calls protocol methods to pass it data that has been received.

而 Transport 與 Protocol 是 1 對 1 的關係

transport-protocol-1-on-1.png

Protocol 有資料要傳送時,會呼叫 transport 的相關方法以傳送資料;而 transport 接收到資料之後,也會呼叫 protocol 的相關方法,讓 protocol 處理接收到的資料。

Protocols

先談談 Protocols 。

asyncio.Protocol (該類別也繼承 BaseProtocol 類別)定義多種方法,這些方法都是 callbacks, 是特定事件發生後,就會被 transport 所呼叫,以下舉 3 個方法為例:

我們還可以在官方文件看到關於狀態機(state machine)的描述,用於描述這些方法被呼叫的流程,例如其中一種 Streaming Protocols 的狀態機為:

start -> connection_made
    [-> data_received]*
    [-> eof_received]?
-> connection_lost -> end

p.s. * 代表可能會被呼叫多次, ? 代表會被呼叫 0 - 1 次

目前 Python 共提供 3 種 Protocols:

  • Streaming Protocols
  • Buffered Streaming Protocols
  • Datagram Protocols

其中 asyncio 的低階函式 loop.create_server()loop.create_unix_server()loop.create_connection() 等,都是回傳實作 Streaming Protocols 的物件。

體驗 Protocols 的實作

我們可以先用 1 個簡單的 HTTP client 體驗一下 Protocol 在處理 application 層面的運作邏輯這件事。

以下是 1 個實作 asyncio.Protocol 的簡單 HTTP client, 該範例會對 http://127.0.0.1:65432 發出 HTTP GET 要求,並列印 response 的內容:

http_client_protocol_demo.py

import asyncio

class HTTPClientProtocol(asyncio.Protocol):
    def __init__(self, on_con_lost):
        self.transport = None
        self.response = ""
        self.on_con_lost = on_con_lost

    def connection_made(self, transport):
        self.transport = transport
        request = (
            "GET / HTTP/1.1\r\n"
            "Connection: close\r\n"
            "\r\n"
        )
        self.transport.write(request.encode())

    def data_received(self, data):
        self.response += data.decode()

    def connection_lost(self, exc):
        print("Response received:")
        print(self.response)
        self.on_con_lost.set_result(True)


async def main():
    loop = asyncio.get_running_loop()

    on_con_lost = loop.create_future()
    _, protocol = await loop.create_connection(
        lambda: HTTPClientProtocol(on_con_lost),
        '127.0.0.1', 65432
    )
    await on_con_lost
    loop.stop()


if __name__ == "__main__":
    asyncio.run(main())

執行上述程式碼之前,請先執行以下指令建立 1 個 Python 文件伺服器,如此一來我們就不需要再開發測試用的 HTTP 伺服器:

$ python -m pydoc -n 127.0.0.1 -p 65432

程式碼執行指令如下:

$ python http_client_protocol_demo.py

成功之後將可以看到類似以下的輸出,是 HTTP response 的內容:

Response received:
HTTP/1.0 200 OK
Server: BaseHTTP/0.6 Python/3.11.4
Date: Wed, 10 Jul 2024 08:26:50 GMT
Content-Type: text/html; charset=UTF-8

<!DOCTYPE>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Pydoc: Index of Modules</title>
...

以下重點說明程式碼。

首先看 coroutine main() 的部分,該 coroutine 透過 event loop 建立 1 個 Future object on_con_lost , 後續會用以確認連線是否關閉(這也是 asyncio 常用技巧,用於確認某項工作是否正常運作, Python 官方文件也有相同用法)。

loop = asyncio.get_running_loop()
on_con_lost = loop.create_future()

接著,同樣透過 event loop 對 127.0.0.1:65432 建立 1 個連線,並使用我們所定義的 HTTPClientProtocol 類別處理連線的邏輯:

_, protocol = await loop.create_connection(
    lambda: HTTPClientProtocol(on_con_lost),
    '127.0.0.1',
    65432
)

p.s. loop.create_connection() 第 1 個參數稱為 protocol_factory ,只要在 asyncio 文件看到它,就必須知道它是 1 個 callable 且會回傳有實作 protocol 的物件

最後在連線結束後,停止 event loop 的運作,結束程式。

await on_con_lost
loop.stop()

回過頭來再看看 HTTPClientProtocol 類別,這個類別繼承 asyncio.Protocol 並會在 __init__(self, on_con_lost) 方法初始化 3 個屬性:

  • self.transport

    用以在 connection_made(transport) 方法內綁定 transport

  • self.on_con_lost

    用以綁定我們從 coroutine main() 傳進來的 Future object

  • self.response

    用以在 data_received(data) 方法內儲存 transport 收到的資料

class HTTPClientProtocol(asyncio.Protocol):
    def __init__(self, on_con_lost):
        self.transport = None
        self.response = ""
        self.on_con_lost = on_con_lost

剩下的部分是實作 connection_made() , data_received() 以及 connection_lost() 這些 asyncio.Protocol 類別所規定的方法。

當連線建立成功之後, transport 會呼叫 protocol 的 connection_made 方法,並把 transport 自己當作參數傳 connection_made 方法,所以我們可以在此處綁定 self.transport 屬性,並使用 self.transport127.0.0.1:65432 發出 HTTP GET request:

def connection_made(self, transport):
    self.transport = transport
    request = (
        "GET / HTTP/1.1\r\n"
        "Connection: close\r\n"
        "\r\n"
    )
    self.transport.write(request.encode())

當 transport 收到來自 127.0.0.1:65432 的回應資料時,就會呼叫 protocol 的 data_received 方法,並把收到的資料當作參數傳入,所以我們可以在此處組合完整的回應資料:

def data_received(self, data):
    self.response += data.decode()

最後,當連線關閉時, transport 又會呼叫 protocol 的 connection_lost 方法,我們在此處列印完整的回應資料,並且在此處對 protocol 的 on_con_lost 屬性設定 result, 如此一來 coroutine main() 中的 await on_con_lost 就會停止等待狀態,進而執行下一行停止 event loop 的程式碼:

def connection_lost(self, exc):
    print("Response received:")
    print(self.response)
    self.on_con_lost.set_result(True)

至此,相信大家對於 Protocol 的運作已經有初步的概念。整體而言, Protocol 的實作頗有 socketserver 模組 的感覺,開發者只要專注在如何處理特定事件即可(例如新連線、新資料、連線關閉等事件)。

p.s. Python 也提供 asyncio 高階函式實作 client, server 的功能,使用上更簡單,詳見 Streams

有了初步概念之後,其實也能夠知道如何透過實作 protocol 做出簡單的 HTTP 伺服器:

http_protocol_server.py

import asyncio


class HTTPServerProtocol(asyncio.Protocol):
    def __init__(self):
        self.transport = None

    def connection_made(self, transport):
        peername = transport.get_extra_info('peername')
        print('Connection from {}'.format(peername))
        self.transport = transport

    def data_received(self, data):
        self.request = data.decode()
        print('Data received: {!r}'.format(self.request))
        self.transport.write((
            b"HTTP/1.1 200 OK\r\n"
            b"Content-Type: text/html\r\n"
            b"\r\n"
            b"<html><head><title>Protocol</title></head><body>"
            b"<h1>Hello World</h1>"
            b"</body></html>"
        ))
        self.transport.close()

async def main():
    loop = asyncio.get_running_loop()

    server = await loop.create_server(
        lambda: HTTPServerProtocol(),
        '127.0.0.1', 65432
    )
    async with server:
        await server.serve_forever()


if __name__ == "__main__":
    asyncio.run(main())

上述程式碼也是遵循實作 asyncio.Protocol 的方法,實作相對應的事件處理方法而已,並且使用 asyncio 低階函式 loop.create_server() 建立 1 個伺服器,此處不再多做贅述。

p.s. loop.create_server() 第 1 個參數也是 protocol_factory

上述程式碼執行指令如下:

$ python http_protocol_server.py

執行之後,使用瀏覽器輸入網址 http://127.0.0.1:65432 將可以看到以下畫面:

asyncio-protocol-server.png

Transport 從何而來?

目前為止,我們並不需要特意實作 Transport, 這是因為預設我們所使用的 asyncio event loop 會自動提供 1 個已經實作好的 transport, 我們可以從 asyncio 低階函式 loop.create_connection()原始碼中找到以下 2 個片段:

transport, protocol = await self._create_connection_transport(
    sock, protocol_factory, ssl, server_hostname,
    ssl_handshake_timeout=ssl_handshake_timeout,
    ssl_shutdown_timeout=ssl_shutdown_timeout)

async def _create_connection_transport(
        self, sock, protocol_factory, ssl,
        server_hostname, server_side=False,
        ssl_handshake_timeout=None,
        ssl_shutdown_timeout=None):

    sock.setblocking(False)

    protocol = protocol_factory()
    waiter = self.create_future()
    if ssl:
        sslcontext = None if isinstance(ssl, bool) else ssl
        transport = self._make_ssl_transport(
            sock, protocol, sslcontext, waiter,
            server_side=server_side, server_hostname=server_hostname,
            ssl_handshake_timeout=ssl_handshake_timeout,
            ssl_shutdown_timeout=ssl_shutdown_timeout)
    else:
        transport = self._make_socket_transport(sock, protocol, waiter)

    try:
        await waiter
    except:
        transport.close()
        raise

    return transport, protocol

上述 2 片段屬於 BaseEventLoop 類別,而繼承 BaseEventLoopBaseSelectorEventLoop 類別與 BaseProactorEventLoop 類別,都各自實作 _make_ssl_transport_make_socket_transport 方法,藉此提供 transport 給 protocol 使用以及將 protocol 物件傳遞給 transport 使用。

p.s. BaseProactorEventLoop 是 Windows 在用的 event loop, 詳見 Event Loop Implementations ,本文僅會探討 Linux, Unix 所使用的 BaseSelectorEventLoop

Transports

如前文所述,我們實際上不需要特意開發 transport, 但我們還是可以閱讀 Python asyncio 模組中的 transports.py原始碼看看 Transport 做了哪些事。

首先 transports 分為 5 種:

  • Read only transports
  • Write only transports
  • Transports (read & write)
  • Datagram transport (給 UDP 使用的 transport )
  • Subprocess transport (給 subprocess 使用的 transport, 是 asyncio 做 Inter-Process Communication 的 transport, 因為 IPC 屬於 1 種 I/O 行為)

這 5 種都繼承自 BaseTransport 類別,從該類別可以看到多個 transport 的基本方法定義,包含:

  • set_protocol() 設定 protocol 物件
  • get_protocol() 取得 protocol 物件
  • close() 關閉 transport

ReadTransport 類別與 WriteTransport 類別又繼承 BaseTransport 類別,並各自定義讀、寫相關方法,包含:

  • is_reading() 是否正在接收資料
  • pause_reading() 暫停接收資料
  • resume_reading() 恢復接收資料
  • set_write_buffer_limits() 設定 buffer 大小
  • write() 寫入資料
  • abort() 立即關閉 transport

這些就是 transport 在做的事,可以看到它專注在傳送/接收資料的細節上,明顯與 protocol 專注在如何使用/回應資料的行為不同。

詳細的實作範例可以參考 _SelectorTransport_SelectorSocketTransport 這 2 個類別。

值得注意的是, _SelectorTransport 還繼承 transports._FlowControlMixin 類別,代表它有額外實作 flow control 功能,有就是說它具有依據緩衝區使用的情況,通知 protocol 暫停寫入、恢復寫入的能力:

class _SelectorTransport(transports._FlowControlMixin,
                         transports.Transport):

    max_size = 256 * 1024  # Buffer size passed to recv().

這告訴我們一件事, flow control 的能力是由 transport 所負責提供的!

Event loop 是在哪裡使用 transport 與 protocol 的?

最後,我們來看看 event loop 是怎麼使用 transport 與 protocol 的。

這一切可以從 asyncio 低階函式 loop.create_server() 開始。

asyncio 的低階函式 loop.create_server() 最後會回傳 1 個 Server object (原始碼):

server = Server(self, sockets, protocol_factory,
                ssl, backlog, ssl_handshake_timeout,
                ssl_shutdown_timeout)
if start_serving:
    server._start_serving()
    # Skip one loop iteration so that all 'loop.add_reader'
    # go through.
    await tasks.sleep(0)

if self._debug:
    logger.info("%r is serving", server)
return server

這個 Server object 會取得我們傳入的參數 protocol_factory ,然後在我們呼叫 Server object 的 serve_forever() 方法時, Server object 會再呼叫 _start_serving() 方法將 protocol_factory 傳遞給 event loop 的 _start_serving() 方法:

callstack.png

然後, event loop 就可以在它的 _start_serving() 方法裡,為 Server object 的 socket object 添加新連線的事件處理(原始碼):

def _start_serving(self, protocol_factory, sock,
                    sslcontext=None, server=None, backlog=100,
                    ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
                    ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT):
    self._add_reader(sock.fileno(), self._accept_connection,
                        protocol_factory, sock, sslcontext, server, backlog,
                        ssl_handshake_timeout, ssl_shutdown_timeout)

_add_reader() 原始碼:

def _add_reader(self, fd, callback, *args):
    self._check_closed()
    handle = events.Handle(callback, args, self, None)
    key = self._selector.get_map().get(fd)
    if key is None:
        self._selector.register(fd, selectors.EVENT_READ,
                                (handle, None))
    else:
        mask, (reader, writer) = key.events, key.data
        self._selector.modify(fd, mask | selectors.EVENT_READ,
                                (handle, writer))
        if reader is not None:
            reader.cancel()
    return handle

可以看到 event loop 透過 self._selector.register(fd, selectors.EVENT_READ,(handle, None)) 將接受新連線的 callback 註冊起來,當 Server 有新連線時,就會使用 event loop 的 _accept_connection() 方法接受新連線,而 event loop 的 _accept_connection() 也會呼叫 _accept_connection2() 建立 protocol 與 transport:

transport-and-protocol.png

如果我們再進一步檢閱 _make_socket_transport() 方法的話,就會發現它是透過 _SelectorSocketTransport 類別將 connection_made(), data_received() 等 protocol 的方法用 self._loop.call_soon() 的方式將工作放進 event loop 排程執行:

selector-socket-transport.png

從這邊可以看出 1 個連線(connection)會使用 1 個獨立的 transport 與 protocol ,當有多個連線時, event loop 就會有多個 transports 與 protocols 要處理。

最後,我們看回「你知道 asyncio 的 event loop 是怎麼 loop 的嗎?談 event loop 的排程與執行」 1 文所提到的 _run_once() 程式碼:

run-once-overview.png

你會發現上圖的紅框 2 有 2 行:

event_list = self._selector.select(timeout)
self._process_events(event_list)

p.s. _process_events() 原始碼

是的,監聽 I/O 事件進而處理新連線就是在此處發生的!

所以每次 event loop 執行 _run_once() 時,都有很短的時間在處理 I/O 事件,並將這些事件透過前述的 transport 與 protocol 將工作放入排程。

這就是 event loop 如何處理 I/O 工作的介紹!

總結

雖然 asyncio 的 event loop 相當複雜,不過只要把握要點,你也能夠漸漸理解 event loop 的運作,進而提高 asyncio 程式設計的掌握能力!

以上!

Enjoy!

References

Transports and Protocols

Python 3 - Event Loop

對抗久坐職業傷害

研究指出每天增加 2 小時坐著的時間,會增加大腸癌、心臟疾病、肺癌的風險,也造成肩頸、腰背疼痛等常見問題。

然而對抗這些問題,卻只需要工作時定期休息跟伸展身體即可!

你想輕鬆改變現狀嗎?試試看我們的 PomodoRoll 番茄鐘吧! PomodoRoll 番茄鐘會根據你所設定的專注時間,定期建議你 1 項辦公族適用的伸展運動,幫助你打敗久坐所帶來的傷害!

贊助我們的創作

看完這篇文章了嗎? 休息一下,喝杯咖啡吧!

如果你覺得 MyApollo 有讓你獲得實用的資訊,希望能看到更多的技術分享,邀請你贊助我們一杯咖啡,讓我們有更多的動力與精力繼續提供高品質的文章,感謝你的支持!