你知道 asyncio 的 event loop 是怎麼 loop 的嗎?談 event loop 的排程與執行
Posted on Jun 20, 2024 in Python 程式設計 - 高階 by Amo Chen ‐ 8 min read
開發過 asyncio 相關應用的人,想必都知道 event loop 的重要性。
然而,你是否具體研究過 event loop 的程式碼,並了解它是如何運作的呢?
深入理解 event loop 不僅能幫助我們解決意料之外的問題,還能讓我們使用 asyncio 時更得心應手!
閱讀本文,讓我們跟 event loop 一起 loop 起來!
本文環境
- Python 3
前言
本文是「Python asyncio 從不會到上路」的延伸之作,專注於講解 event loop 的運作與細節,對 asyncio 不熟悉的讀者,強烈建議請先閱讀該文,建立對 asyncio 的正確認知,以對本文有更好的理解與掌握能力。
asyncio.run() 做了什麼事?
我們都知道 coroutine 可以透過呼叫 asyncio.run() 執行。
Application developers should typically use the high-level asyncio functions, such as asyncio.run(), and should rarely need to reference the loop object or call its methods.
而且, Python 官方文件也提及開發者應該盡量使用 asyncio.run()
之類的 asyncio 高階函式(high-level function)執行 coroutine, 少透過存取 event loop object 與呼叫 event loop 相關的方法。
不過我還是很好奇 asyncio.run() 具體做了什麼事。
asyncio.run()
是 1 個高階函式(high-level function),它負責將 coroutine 交給 event loop 處理,並回傳 coroutine 執行結果,而這個過程被 Python 包裝的極為簡單, asyncio.run()
的原始碼相當簡單:
def run(main, *, debug=None, loop_factory=None):
if events._get_running_loop() is not None:
# fail fast with short traceback
raise RuntimeError(
"asyncio.run() cannot be called from a running event loop")
with Runner(debug=debug, loop_factory=loop_factory) as runner:
return runner.run(main)
p.s. 實際上 asyncio.run()
裡的 Runner 類別還包含建立 event loop 以及關閉 event loop 等等事務
上述程式碼可以看到 asyncio.run()
是不能在 1 個 event loop 內進行呼叫的,反之亦然,原因在於它會先檢查是否已經有 event loop 正在執行(running),而真正的原因是 1 個 Python 執行緒只允許 1 個 event loop 進入執行狀態(running),而 asyncio.run()
執行時會建立 1 個全新的 event loop 給自己使用,如果已經有其他 event loop 正處於執行狀態的話,就會違反這個規定。
p.s. 要讓 event loop 進入執行狀態的話,則是呼叫 loop.run_until_complete(future) 或 loop.run_forever()
在 event loop 的實作也可以看到它有 1 個方法會檢查是否已經有其他的 event loop 正在執行(原始碼):
def _check_running(self):
if self.is_running():
raise RuntimeError('This event loop is already running')
if events._get_running_loop() is not None:
raise RuntimeError('Cannot run the event loop while another loop is running')
看到此處,如果未來再次遇到 asyncio 的錯誤訊息 RuntimeError: Cannot run the event loop while another loop is running
的話,就可以直覺想到原因是已經有其他 event loop 正處於執行狀態(running)所造成的。
以下程式碼模擬在 1 個 event loop 裡試圖又執行 1 個 event loop 的情況:
import asyncio
async def print_hello():
await asyncio.sleep(1)
print("Hello")
async def do_jobs():
await asyncio.sleep(1)
loop = asyncio.new_event_loop()
loop.run_until_complete(print_hello)
asyncio.run(do_jobs())
上述程式碼執行結果如下,從執行結果可以看到前文所述的錯誤訊息 Cannot run the event loop while another loop is running
:
Traceback (most recent call last):
File "test.py", line 17, in <module>
asyncio.run(do_jobs())
File "/.../asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/.../asyncio/base_events.py", line 616, in run_until_complete
return future.result()
File "test.py", line 12, in do_jobs
loop.run_until_complete(print_hello)
File "/.../asyncio/base_events.py", line 592, in run_until_complete
self._check_running()
File "/.../asyncio/base_events.py", line 554, in _check_running
raise RuntimeError(
RuntimeError: Cannot run the event loop while another loop is running
造成錯誤的根本原因是 asyncio.run()
本身已經執行 1 個 event loop 了,但在 coroutine do_jobs()
中又試圖以 1 個新的 event loop 執行 coroutine print_hello()
。
修正後的程式碼如下:
import asyncio
async def print_hello():
await asyncio.sleep(1)
print("Hello")
async def do_jobs():
await asyncio.sleep(1)
await print_hello()
asyncio.run(do_jobs())
只要拿掉試圖執行新的 event loop 部分,並將 print_hello()
加上 await
關鍵字進行呼叫即可。
Runner 類別
在前文可以看到 asyncio.run()
最後是呼叫 Runner 類別處理我們所呼叫的 coruntine, 所以真正建立 event loop 以及執行 coroutine 等工作,都是 Runner 負責的。
前文的程式碼 asyncio.run(do_jobs())
其實等同於:
with asyncio.Runner() as runner:
runner.run(do_jobs())
那 Runner 具體做了什麼事?
直接看 Runner 類別的 run(coro, *, context=None) 方法的程式碼:
上述程式碼主要做 4 件事(對應數字):
- 建立 event loop
- 將 coroutine 轉為 asyncio Task object
- 把 asyncio Task object 交由 event loop 排程後執行
- 關閉 event loop
這就是 Runner 實際做的事情,從這邊也可以看到 event loop 扮演著相當重要的角色,如果要執行 1 個 coroutine, 就需要建立 event loop 並請 event loop 排程執行它。
Event Loop
The event loop is the core of every asyncio application. Event loops run asynchronous tasks and callbacks, perform network IO operations, and run subprocesses.
知道 asyncio.run()
是將 coroutine 交給 event loop 執行以及 1 個執行緒只能有 1 個 處於執行狀態的 event loop 之後, 就可以開始研究 event loop 的實作細節了!
Event loop 的實作細節主要在 BaseEventLoop 類別,它最關鍵的部分是 1 個 while 迴圈,不斷地迭代(iteration)進行排程(schedule)、執行非同步任務、回呼函式(callbacks)等工作,其關鍵程式碼如下圖所示,可以看到存在 1 個 while 迴圈:
p.s. 這裡就是 event loop 的 loop
p.s. 此處的回呼函式(callbacks)指的是我們透過 loop.call_soon() , loop_call_later() 等方法,交由 event loop 排程執行的 function, subroutine, 你可以從官方文件中看到這些方法的參數名稱是 callback 。
其中排程、執行 coroutine 等工作都被封裝在 self._run_once()
之中。
p.s. _run_once()
很可能會給人命名上的混淆,但它的真正意思是 “Run one full iteration of the event loop.” , 完整迭代 1 次 event loop 並執行相關工作
self._run_once()
程式碼如下圖所示:
程式碼感覺很複雜,不過大致可以分為 4 個區塊進行說明,上述 4 個區塊做的事情是(對應數字):
- 檢查與移除被取消的工作
- 輪詢(polls)並處理 I/O 事件,例如,讀取資料、寫入資料等等
- 檢查延時工作是否可以開始處理,也就是使用 loop.call_later(), loop.call_at() 的 delayed callbacks
- 執行工作
知道 event loop 做了上述這 4 件事之後,相信多數人此刻還是跟我一樣,對於 event loop 排程與執行還是處於一知半解的狀態。
沒關係,我們慢慢破解。
首先,我們從程式碼可以知道 event loop 將工作分成 2 種狀態:
- scheduled, 對應
self._scheduled
- ready, 對應
self._ready
只有處於 ready 狀態的工作才會被執行。
先從數字 4 區塊開始,這部分最重要的是執行工作的程式碼(剩下屬於 debug 的部分),簡化之後變成:
ntodo = len(self._ready)
for i in range(ntodo):
handle = self._ready.popleft()
if handle._cancelled:
continue
handle._run()
從上述程式碼可以知道 self._ready
裡面存著 0 至 n 個要執行的工作(實際上 self._ready
是 1 個 deque )。
如果 handle
已經處於取消狀態,它就不會被執行,否則就呼叫 handle
的 _run()
方法執行工作。
handle
是 asyncio 的回呼函式(callbacks) wrapper, 分為 2 種:
呼叫 loop.call_soon(), loop.call_soon_threadsafe() 之後會回傳的 object
呼叫 loop.call_later(), loop.call_at() 之後會回傳的 object
我們實際用以下程式碼實驗看看,以下程式碼使用 loop.call_soon()
與 loop.call_later()
告訴 event loop 要執行 2 個工作,並查看 event loop 工作列表的狀態:
loop = asyncio.get_event_loop()
loop.call_soon(print, "Hello, soon")
loop.call_later(1, print, "Hellp, later")
print('scheduled:', loop._scheduled)
print('ready:', loop._ready)
上述程式碼執行結果如下:
scheduled: [<TimerHandle when=1.042924291 print('Hellp, later')>]
ready: deque([<Handle print('Hello, soon')>])
從執行結果可以看到,如果使用 loop.call_soon()
就會將工作放到 event loop 的 _ready
屬性內,以求儘早執行;如果使用 loop.call_later()
就會將工作放到 event loop 的 _scheduled
屬性內,等待時間到了之後才會轉到 event loop 的 _ready
屬性內。
此外,如果想執行 Handle object 所包裝的工作,則是呼叫 Handle object 的 _run()
方法,例如:
loop = asyncio.get_event_loop()
handle = loop.call_soon(print, "Hello")
handle._run()
上述程式執行之後,就會印出 Hello
, event loop 也是透過呼叫 Handle object 的 _run()
方法執行工作。
至此,我們基本可以知道 event loop 就是透過以下 4 個方法方法對工作進行排程:
那 scheduled
是如何變成 ready
的?
原來 TimerHandle object 裡有個 _when
屬性,這個屬性會決定在哪個 monotonic time (秒為單位)之後可以執行,而 event loop 也會藉由呼叫 time.monotonic() 得知現在時間是否已經到達/超過 TimerHandle 預定執行的時間點,如果到達/超過就可以轉到 event loop 的 _ready
屬性,這就是數字 3 區塊所做的事情:
end_time = self.time() + self._clock_resolution
while self._scheduled:
handle = self._scheduled[0]
if handle._when >= end_time:
break
handle = heapq.heappop(self._scheduled)
handle._scheduled = False
self._ready.append(handle)
我們可以用以下程式碼玩看看 event loop 的 monotonic time 以及存取 TimerHandle 的 _when
屬性:
import asyncio
loop = asyncio.get_event_loop()
handle = loop.call_later(10, print, "Hellp, later")
print(f'{handle._when=} {loop.time()=}')
print('Is timer handle object ready?', handle._when > loop.time() + loop._clock_resolution)
上述程式執行結果如下:
handle._when=10.057251541 loop.time()=0.057257791
Is timer handle object ready? False
前述程式碼故意設定在 10s 之後才執行 print
, 所以 TimerHandle object 在時間沒經過 10 秒之前,都會是 not ready 的狀態。
有人可能會疑問,為什麼 10 秒還會有很多小數點,這是因為 loop.call_later()
在設定 _when
屬性時,也有呼叫一次 loop.time()
取得 monotonic time 的關係, loop.call_later()
的程式碼如下,可以看到 _when
是由 self.time() + delay
的值所決定:
def call_later(self, delay, callback, *args, context=None):
if delay is None:
raise TypeError('delay must not be None')
timer = self.call_at(self.time() + delay, callback, *args,
context=context)
if timer._source_traceback:
del timer._source_traceback[-1]
return timer
這就是 scheduled
如何轉成 ready
的邏輯!
總結一下:
- Event loop 裡的排程分為
scheduled
與ready
2 種狀態,只有ready
會被執行。 scheduled
狀態的工作會看是否到達/超過預定時間(也就是_when
屬性)才會轉成ready
狀態。- 要將工作放進 event loop 的排程是靠 loop.call_soon(), loop.call_soon_threadsafe() , loop.call_later(), loop.call_at() 方法。
等等,那 loop.create_task() 是怎麼排程的?
前述的 loop.call_soon(), loop.call_soon_threadsafe() , loop.call_later(), loop.call_at() 4 個方法的參數 callback 都是單純的函式(function, subroutine),如果是代入 coroutine 的話,將會無法正常運作,例如下列程式碼:
import asyncio
async def main(loop):
await asyncio.sleep(1)
print("Goodbye from the custom event loop!")
loop.stop()
loop = asyncio.get_event_loop()
loop.call_soon(main, loop)
loop.run_forever()
如果試圖執行上述程式碼的話,將會出現 warning 訊息 RuntimeWarning: coroutine 'main' was never awaited
,然後 coroutine main()
沒有被執行,更看不到它預期會列印的文字,而且也沒有例外錯誤等情況, event loop 依然在運作。
針對這種情況,如果我們去細看 Handle object 的 _run()
方法的原始碼,就會發現它是用 Context object 的 run() 方法執行回呼函式(callback),如果我們的 callback 是 coroutine 的話,根本就不能正常執行,也不會有例外錯誤產生:
def _run(self):
try:
self._context.run(self._callback, *self._args)
except (SystemExit, KeyboardInterrupt):
raise
except BaseException as exc:
...(略)...
我們可以用以下程式碼模擬出一樣的情況:
import contextvars
async def coro():
print('Hello')
context = contextvars.copy_context()
context.run(coro)
上述程式碼執行結果如下所示,如預期般僅出現 coroutine was never awaited 的 warning 訊息:
test.py:7: RuntimeWarning: coroutine 'coro' was never awaited
context.run(coro)
看到此處,如果未來看到 warning 訊息 RuntimeWarning: coroutine 'main' was never awaited
的話,就可以直覺想到是不是丟了 coroutine 要給 Handle object 執行囉!
要將 coroutine 交給 event loop 排程執行的話,需要呼叫 loop.create_task() 。
loop.create_task() 可以讓我們將 coroutine 放進 event loop 進行排程與執行,所以前述有問題的程式碼需要改為下列形式:
import asyncio
async def main(loop):
await asyncio.sleep(1)
print("Goodbye from the custom event loop!")
loop.stop()
loop = asyncio.get_event_loop()
loop.create_task(main(loop))
loop.run_forever()
上述程式碼執行之後,就可以看到 coroutine main()
正常執行,而且 event loop 也在執行 coroutine main()
之後正常結束。
有人可能會有疑問,是否 loop.create_task() 也是 1 種讓 event loop 排程的方法?
其實 loop.create_task()
最後還是呼叫 loop.call_soon()
進行排程。
以下是 loop.create_task()
的程式碼,可以看到它會將 coroutine 轉成 asyncio Task object:
再細追 Task object 的程式碼的話,就會發現它最後還是透過呼叫 loop.call_soon()
進行排程,只是排程的目標是 __step()
方法,而這個方法最終會負責執行 coroutine:
以下是 Task object 的 __step()
方法的程式碼,其中 self.__step_run_and_handle_result()
就是執行 coroutine 的關鍵:
def __step(self, exc=None):
if self.done():
raise exceptions.InvalidStateError(
f'_step(): already done: {self!r}, {exc!r}')
if self._must_cancel:
if not isinstance(exc, exceptions.CancelledError):
exc = self._make_cancelled_error()
self._must_cancel = False
self._fut_waiter = None
_enter_task(self._loop, self)
try:
self.__step_run_and_handle_result(exc)
finally:
_leave_task(self._loop, self)
self = None # Needed to break cycles when an exception occurs.
繼續追看 Task object __step_run_and_handle_result()
方法的程式碼,就會發現執行 coroutine 的關鍵方法 — send()
方法或者 throw()
方法(詳見 PEP-0492):
def __step_run_and_handle_result(self, exc):
coro = self._coro
try:
if exc is None:
# We use the `send` method directly, because coroutines
# don't have `__iter__` and `__next__` methods.
result = coro.send(None)
else:
result = coro.throw(exc)
...(略)...
這就是 event loop 如何排程 coroutine 的說明,其實 loop.create_task()
最終還是透過前文所介紹的方法進行排程。
以上,就是關於 event loop 如何排程、執行工作的介紹啦!
總結
Event loop 是 Python asyncio 的重要核心,包含 coroutine, Task, Future 以及一般 function 都可以在 event loop 中執行。
Python 對 event loop 的封裝相當完善,因此多數情況下,我們不需要理解它的具體運作邏輯,也正因為如此,當 coroutine 的執行不如預期,或者遇到 Cannot run the event loop while another loop is running, coroutine was not awaitable 等錯誤時,我們往往不知道問題在哪裡,導致挫敗感,甚至使人退卻。
所以,花點時間理解 event loop 的運作,可以大大提高我們對 asyncio 的掌握與使用能力!
以上!
Enjoy!
References
PEP 492 – Coroutines with async and await syntax