Python asyncio 從不會到上路

Last updated on  Jul 1, 2024  in  Python 程式設計 - 高階  by  Amo Chen  ‐ 13 min read

自從 Python 3.4 推出 asyncio 模組之後,開發者在提升 Python 程式效能的解決方案上又多了 1 種選擇。

不過相較於較為人所熟知的 multiprocessingthreading 而言,大多數初學者並不習慣非同步式(asynchronous)式的開發思維,但只要能夠掌握 asyncio 模組中幾點重要的概念,即使是從未接觸過的初學者,也能夠慢慢掌握 asyncio 的使用方式。

本文將重點介紹 asyncio 模組中的重要概念,並透過實際範例理解 asyncio 的運作,從而學會如何使用 asyncio 模組。

本文環境

$ pip install aiohttp requests

Python 3.8 之後 asyncio 增加了許多方便的函式,建議使用 Python 3.8 或 Python 3.9 學習 asyncio, 痛苦會少一點。

Hello, asyncio

以下範例是常見的程式寫法,該範例在 do_requests() 函式中以 for 迴圈對 example.com 發出 10 次 HTTP GET 要求(request),並且列印其狀態碼(status code):

import requests


def do_requests():
    resp = requests.get('https://example.com')
    print('example.com =>', resp.status_code)


def main():
    for _ in range(0, 10):
        do_requests()


if __name__ == '__main__':
    main()

上述範例執行結果如下:

$ time python test.py
example.com => 200
example.com => 200
example.com => 200
example.com => 200
example.com => 200
example.com => 200
example.com => 200
example.com => 200
example.com => 200
example.com => 200
python test.py 0.40s user 0.14s system 7% cpu 6.717 total

上述結果看到前述範例花了約 6 秒時間完成 10 次的 HTTP GET 以及列印狀態碼的任務。其執行次序就像下圖, do_requests() 一個接著一個依序做完,假設每次需要 0.6 秒完成 do_requests() ,那麼 10 次就需要花費 10 秒:

如果想要增加前述範例的效能,可以利用 multiprocessing 或 threading 模組將 10 次 do_requests() 交給若干 processes 或 threads 平行(concurrently)處理,假設有 5 個 processes 或 threads 可以平行處理,那麼理想上就只需花費約 10 / 5 * 0.6 = 1.2 秒,不過其缺點是會使用相對多的系統資源(如記憶體用量增加)。

除此之外,有沒有其他方法能夠增加程式效能呢?

do_requests() 為例,我們大致上可以知道 0.6 秒的時間,應有很大一部分都在處理網路相關的事務,例如發出 HTTP GET 要求後,等待遠方的伺服器回應的等待時間,但這些等待時間使得 CPU 也必須跟著等待,使得 CPU 沒辦法執行其他工作,這造成程式感覺很慢之外,也造成 CPU 閒置浪費運算資源。

於是 asyncio 橫空出世!

asyncio 能夠讓開發者針對類似上述提及的 I/O 等待時間造成程式效能低落的問題,將 CPU 從等待中解放,徹底利用 CPU 的運算資源。其原理為在程式中需要進行等待的地方,讓 CPU 切換執行其他工作,並稍後再切換回剛剛等待的地方確認是否等待已有結果,如果沒有結果就再進行切換執行其他工作,看起來也很像同時間平行執行很多工作一樣,但資源的使用上相對於使用 multiprocessing 或 threading 模組來得少。

以下是將先前範例改為 asyncio 的版本,可以不用理解每 1 行的意思,執行看看體驗 asyncio 的威能即可:

import aiohttp
import asyncio


def do_requests(session):
    return session.get('https://example.com')


async def main():
    async with aiohttp.ClientSession() as session:
        tasks = []
        for _ in range(0, 10):
            tasks.append(do_requests(session))

        results = await asyncio.gather(*tasks)
        for r in results:
            print('example.com =>', r.status)


if __name__ == '__main__':
    asyncio.run(main())

上述範例執行結果如下,可以看到透過 asyncio 大大地縮短執行時間,從原本的 6 秒縮短至 0.971 秒,相當驚人。

$ time python test_async.py
example.com => 200
example.com => 200
example.com => 200
example.com => 200
example.com => 200
example.com => 200
example.com => 200
example.com => 200
example.com => 200
example.com => 200
python test_async.py  0.30s user 0.13s system 18% cpu 0.971 total

其執行的次序類似下圖,執行 do_requests() 到一半就先跳去執行其他工作,不過下圖經過很大的簡化,並非真正的執行過程,該圖僅是為了讓初學者對於 asyncio 在切換執行有個初步的樣貌:

Coroutines (或稱協程)

使用 asyncio 撰寫非同步程式的第 1 步,就得先了解什麼是 coroutine.

根據 Python 官方文件定義如下:

Coroutines are a more generalized form of subroutines. Subroutines are entered at one point and exited at another point. Coroutines can be entered, exited, and resumed at many different points. They can be implemented with the async def statement.

簡而言之, coroutine 具有開始(enter)/暫停(exit)以及任意恢復(resume)執行的能力,譬如前述範例中發出 HTTP GET 要求之後,就暫停執行該函式,轉而執行其他工作,等到該 HTTP GET 要求收到伺服器回應之後,再轉回來恢復執行剩下的工作。

而這種 coroutine 與一般用 def 定義的函式/方法不同,是以 async def 進行定義,例如前述範例中的 async def main(): 就是定義 1 個 coroutine, 也可以認為是透過 async def 明確告知 Python 該函式/方法具有非同步執行的能力。

另外 await 語法則是被用來告知 Python 可以在此處暫停執行 coroutine 轉而執行其他工作,而且該語法只能在 coroutine 函式內使用,因此 async defawait 通常會一起出現。

另 1 個關於 await 語法的重點是 await 之後只能接 awaitables 物件,例如 coroutine 或者是之後會介紹到的 Task, Future 以及有實作 __await__() 方法 的物件,所以不是所有的物件或操作都能夠用 await 進行暫停。

目前 Python 有 2 種實作 coroutines:

  1. native coroutines
  2. generator-based coroutines

第 1 種 native coroutines 是使用 async def 定義的函式。

第 2 種 generator-based coroutines 是用 @asyncio.coroutin 裝飾子寫成的函式,不過該種 coroutine 將於 Python 3.10 之後停止支援,因此不建議學習或者繼續使用。

為了方便,本文統一只使用 native coroutines 進行教學。

實際實作 1 個 coroutine 吧!

以下是 1 個簡單的 coroutine, 會以非同步的方式沈睡 1 秒 await asyncio.sleep(1) 之後列印出 hello :

import asyncio

async def main():
    await asyncio.sleep(1)
    print('hello')

main()

執行之後會出現以下內容,而且連預期的 hello 字串也沒出現:

$ python test.py
test.py:7: RuntimeWarning: coroutine 'main' was never awaited
  main()

這是由於 coroutine 本身的特性有別於一般函式,一定要透過 event loop 排程後執行。

所以前述範例的問題,可以透過呼叫 asyncio.run() 進行修正,該方法能透過 event loop 執行 coroutine 並回傳其結果,因此上述的範例可以修正為:

import asyncio

async def main():
    await asyncio.sleep(1)
    print('hello')

asyncio.run(main())

成功執行的話,將可以看到上述範例正確印出 hello 字串。

順帶一提,當有 coroutine 內的 awaitable 物件忘記以 await 進行呼叫時,例如以下的 asyncio.sleep(1) 就會出現 RuntimeWarning ,雖然有可能無損程式的執行,但同時也代表該 awaitable 物件並沒有發揮其作用,例如以下的範例並沒有真正等待 1 秒之後才列印出 hello 字串:

import asyncio

async def main():
    asyncio.sleep(1)
    print('hello')

asyncio.run(main())

上述範例執行結果如下:

$ python test.py
test.py:4: RuntimeWarning: coroutine 'sleep' was never awaited
  asyncio.sleep(1)
hello

Event loop

初步認識 coroutines 之後,緊接著必須了解的是 event loop 。

Event loop 是 asyncio 模組的核心,用以負責執行非同步(asynchronous)的工作,例如前述範例中的 coroutine async def main() , 如果少了 event loop 的作用,那麼 coroutines 將無法被執行與回收其執行結果。

The event loop is the core of every asyncio application. Event loops run asynchronous tasks and callbacks, perform network IO operations, and run subprocesses.

而實際上 event loop 是 1 個 Python 類別 BaseEventLoop ,正如其名, event loop 關鍵運作部分是 1 個無窮迴圈(原始碼),不斷地 loop 進行排程/執行非同步任務、回呼函式(callbacks)等工作, I/O 類的工作十分適合以非同步方式交由 event loop 執行,例如網路通訊、檔案讀寫等等,以利 event loop 進行工作切換。

原則上,我們不需針對 event loop 進行太多操作與干涉, Python 已經將 event loop 打理得十分易於使用,如果有需要的話,可以使用 asyncio.get_running_loop() 或者 asyncio.get_event_loop() 即可取得 event loop 實例(instance)以進行操作。

有興趣了解更多關於 event loop 的話,詳見你知道 asyncio 的 event loop 是怎麼 loop 的嗎?談 event loop 的排程與執行

Awaitables

在使用 asyncio 相關函式時,經常可以在文件中看到 awaitables 關鍵字,這個關鍵字就代表著以下 3 種 Python 物件(objects),也是 await 語法適用的對象:

  1. Coroutines
  2. Tasks - asyncio.Task
  3. Futures - asyncio.Future

asyncio 很多函式/方法(method)所需要的參數多半是上述 3 種不同類型的物件之一,因此一定要注意其差別,如果是 3 種皆可,通常會在文件中以 aw , *aws 或者 awaitables 說明。

Tasks

在 event loop 中,工作的執行是以 Task 為單位, event loop 一次僅會執行 1 個 Task, 如果某個 Task 正在等待執行結果,也就是執行到 await 的地方,那麼 event loop 將會暫停(suspend)並將之進行排程,接著切換執行其他 Task, 回呼函數(callback)或者執行某些 I/O 相關的操作。

Event loops use cooperative scheduling: an event loop runs one Task at a time. While a Task awaits for the completion of a Future, the event loop runs other Tasks, callbacks, or performs IO operations.

我們可以將 Task 視為是 coroutine 的再包裝,因此可以看到 asyncio.create_task() 函數接受的參數必須是 coroutine ,例如以下範例實際建立 1 個 Task 實例後,交由 event loop 執行:

import asyncio


async def coro():
    print('hello')
    await asyncio.sleep(1)
    print('world')


loop = asyncio.get_event_loop()
task = loop.create_task(coro())
loop.run_until_complete(task)

上述執行結果:

hello
world

順帶一提,官方範例中直接呼叫 asyncio.create_task() 時,可能會出現類似以下錯誤 no running event loop

Traceback (most recent call last):
  File "a.py", line 10, in <module>
    task = asyncio.create_task(coro())
  File "/.../versions/3.8.0/lib/python3.8/asyncio/tasks.py", line 381, in create_task
    loop = events.get_running_loop()
RuntimeError: no running event loop
sys:1: RuntimeWarning: coroutine 'coro' was never awaited

其原因在於我們沒有可使用的 event loop, 因此可以像前述範例取得 event loop 後,再使用 event loop 的 create_task() 方法將 coroutine 轉成 Task, 例如:

loop = asyncio.get_event_loop()
task = loop.create_task(coro())

此外 Task 也提供我們更多元/彈性的操作,例如取消 Task, 新增/刪除回呼函數( cancel(), add_done_callback() , remove_done_callback() )等等。

例如取消 Task 的範例:

import asyncio


async def cancel_me():
    print('cancel_me(): sleep')
    try:
        # Wait for 1 hour
        await asyncio.sleep(3600)
    except asyncio.CancelledError:
        print('cancel_me(): cancel sleep')
        raise
    finally:
        print('cancel_me(): after sleep')


async def main():
    print('main(): running')
    # Create a "cancel_me" Task
    task = asyncio.create_task(cancel_me())

    # Wait for 5 second
    print('main(): sleep')
    await asyncio.sleep(5)

    print('main(): call cancel')
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print('main(): cancel_me is cancelled now')


asyncio.run(main())

上述範例執行結果如下:

main(): running
main(): sleep
cancel_me(): sleep
main(): call cancel
cancel_me(): cancel sleep
cancel_me(): after sleep
main(): cancel_me is cancelled now

從上述結果可以發現執行 task.cancel() 之前, task 就已經開始執行了,這是由於 await asyncio.sleep(5) 給了 event loop 切換執行 cancel_me() 的機會,所以我們才會看到在 cancel_me(): sleep 出現在 main(): call cancel 之前。

如果把 await asyncio.sleep(5) 註解掉就會看到 cancel_me() 連執行的機會都沒有就被取消了:

import asyncio


async def cancel_me():
    print('cancel_me(): sleep')
    try:
        # Wait for 1 hour
        await asyncio.sleep(3600)
    except asyncio.CancelledError:
        print('cancel_me(): cancel sleep')
        raise
    finally:
        print('cancel_me(): after sleep')


async def main():
    print('main(): running')
    # Create a "cancel_me" Task
    task = asyncio.create_task(cancel_me())

    # Wait for 5 second
    #print('main(): sleep')
    #await asyncio.sleep(5)

    print('main(): call cancel')
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print('main(): cancel_me is cancelled now')


asyncio.run(main())

上述執行結果如下:

main(): running
main(): call cancel
main(): cancel_me is cancelled now

換言之,在呼叫 asyncio.create_task() 後, event loop 就已經接收到有 1 個 Task 需要執行,只是該 coroutine 需要有機會被 event loop 切換執行,而剛好 await asyncio.sleep(5) 恰好給了 event loop 切換執行的機會,這剛好對應到 Python 官方文件說明「 asyncio.create_task() 負責將 coroutine 包裝成 Task 並安排其執行」:

Wrap the coro coroutine into a Task and schedule its execution.

也因此, asyncio.create_task() 回傳的 Task 並不需要等到使用 await 才會被執行,我們可以使用以下的範例進行驗證,該範例中並沒有 await task 但依然能夠順利被 event loop 執行:

import asyncio


async def cancel_me():
    print('cancel_me(): sleep')
    for i in range(1, 101):
        print('cancel_me(): print', i)
        if i % 10 == 0:
            await asyncio.sleep(2)


async def main():
    print('main(): running')
    # Create a 'cancel_me' Task
    task = asyncio.create_task(cancel_me())

    # Wait for 60 second
    print('main(): sleep')
    await asyncio.sleep(60)


asyncio.run(main())

上述範例執行結果如下:

main(): running
main(): sleep
cancel_me(): sleep
cancel_me(): print 1
cancel_me(): print 2
cancel_me(): print 3
cancel_me(): print 4
cancel_me(): print 5
cancel_me(): print 6
cancel_me(): print 7
cancel_me(): print 8
cancel_me(): print 9

Futures

Task 繼承自 Future, 因此 Future 是相對底層(low-level)的 awaitable Python 物件,用以代表非同步操作的最終結果,一般並不需要自己創造 Future 物件進行操作,多以 coroutine 與 Task 為主。

不過仍有些 asyncio 模組的函式會回傳 Future 物件 ,例如 asyncio.run_coroutine_threadsafe() 回傳的就是 Future 物件,因此我們也需要稍微了解。

與 Task 不同的是, Future 物件並不是對 coroutine 進行再包裝,而是作為代表非同步操作最終結果的物件,因此該物件有 1 個 set_result() 方法,可以將結果寫入,同時該 Future 物件也會被標為結束(done)的狀態,所以 Future 物件通常會與 coroutines 或 Tasks 混搭使用,例如以下範例:

import asyncio


async def do_async_job(fut):
    await asyncio.sleep(2)
    fut.set_result('Hello future')


async def main():
    loop = asyncio.get_running_loop()

    future = loop.create_future()
    loop.create_task(do_async_job(future))

    # Wait until future has a result
    await future

    print(future.result())


asyncio.run(main())

上述範例先以 future = loop.create_future() 創造 1 個 Future 物件,並將其作為參數傳入 coroutine do_async_job 中,再以 fut.set_result('Hello future') 將結果寫入。由於 await 語法後也可以接 Future 物件,所以可以直接以 await future 等待到 future 被寫入結果為止,最後就能夠順利列印 future 的結果了。

如果有熟悉 Javascript Promise 的人,應該會覺得 Future 與 Promise 有些類似,兩者分別透過 set_result()resolve() 將結果寫入,並且將狀態設定為 donefulfilled

// 以下為 Javascript 程式碼片段
const promise = new Promise((resolve, reject) => {
  setTimeout(() => {
    resolve('foo');
  }, 300);
});

promise.then(v => console.log(v)) // 列印 foo

此外, Future 也與 Task 相同擁有取消 Task, 新增/刪除回呼函數等方法可供使用,礙於篇幅受限在此不多贅述。

asyncio.gather()

理解 asyncio 中的 awaitable 物件之後,將會對於 asyncio 中每個方法需要傳入什麼類型的參數具有一定的掌握力,例如本節將介紹的 asyncio.gather() 方法,其傳入參數為 *aws 即代表 awaitable 物件,所以我們可以同時傳入 coroutine, Task 甚至 Future 物件皆可, asyncio.gather() 會自動處理 awaitables, 例如將 coroutine 統一轉為 Task:

If any awaitable in aws is a coroutine, it is automatically scheduled as a Task.

asyncio.gather() 作用在於執行多個 awaitable 物件(一樣是透過 event loop),並收集每 1 個的回傳值存於一個串列(list)中:

Run awaitable objects in the aws sequence concurrently.

If all awaitables are completed successfully, the result is an aggregate list of returned values. The order of result values corresponds to the order of awaitables in aws.

因此實務上可以將多個執行相同任務的 coroutines, Tasks 或者 Futures 一起交由 asyncio.gather() 執行,例如未使用 asyncio.gather() 之前可能是以下程式碼:

import asyncio
import threading
from datetime import datetime


async def do_async_job():
    await asyncio.sleep(2)
    print(datetime.now().isoformat(), 'thread id', threading.current_thread().ident)


async def main():
    await do_async_job()
    await do_async_job()
    await do_async_job()


asyncio.run(main())

上述範例執行結果如下,從結果可以發現 3 個 do_async_job() 其實還是依序執行(而且都是同一個 thread id, 正好呼應 event loop 會負責執行 coroutines 的說明),依序執行並非是我們想要的結果,代表沒有徹底發揮 asyncio 切換執行的威力,我們依然有優化空間:

2021-07-31T23:10:04.956662 thread id 4602453504
2021-07-31T23:10:06.961217 thread id 4602453504
2021-07-31T23:10:08.965344 thread id 4602453504

因此前述範例可以使用 asyncio.gather() 進一步優化為以下形式:

import asyncio
import threading
from datetime import datetime


async def do_async_job():
    await asyncio.sleep(2)
    print(datetime.now().isoformat(), 'thread id', threading.current_thread().ident)


async def main():
    job1 = do_async_job()
    job2 = do_async_job()
    job3 = do_async_job()
    await asyncio.gather(job1, job2, job3)


asyncio.run(main())

上述範例執行結果如下,可以發現整體執行時間變短,而且 job1, job2, job3 之間的執行時間十分接近,正好再次呼應 asyncio.gather() 文件說明:

Run awaitable objects in the aws sequence concurrently.

2021-07-31T23:11:53.221184 thread id 4692635136
2021-07-31T23:11:53.221413 thread id 4692635136
2021-07-31T23:11:53.221469 thread id 4692635136

另外,如果需要存取 asyncio.gather() 的回傳值,可以使用 for 迴圈一一取出:

import asyncio
import random


async def do_async_job():
    await asyncio.sleep(2)
    return random.randint(1, 10)


async def main():
    job1 = do_async_job()
    job2 = do_async_job()
    job3 = do_async_job()
    return_values = await asyncio.gather(job1, job2, job3)
    for v in return_values:
        print('result =>', v)


asyncio.run(main())

上述範例執行結果如下:

result => 9
result => 4
result => 9

設定時限(Timeouts)

對於 event loop 來說,能夠持續切換執行不同工作的能力是相當重要的,如果有任何 1 個工作佔據 event loop 使其無法進行切換,那麼就會拖延其他工作的執行,導致其他工作都要等佔據 event loop 的工作完成才能繼續。

所以我們可以使用 asyncio.wait_for() 為 awaitables(coroutine, Task, Future) 設定 1 個時限,例如以下範例為 do_async_job() 設定 1 秒的時限,該 coroutine 將無法在 2 秒後印出字串,因為 asyncio.wait_for() 會取消(cancel)該工作的執行,並且 raise TimeoutError :

import asyncio


async def do_async_job():
    await asyncio.sleep(2)
    print('never print')


async def main():
    try:
        await asyncio.wait_for(do_async_job(), timeout=1)
    except asyncio.TimeoutError:
        print('timeout!')


asyncio.run(main())

上述範例執行結果如下:

$ python test.py
timeout!

實務上,我們可以利用 asyncio.wait_for() 為某個工作設定時限,當該工作超出時限時,捕捉 TimeoutError 執行備選方案,例如以下形式:

import asyncio


async def do_async_job():
    await ...

async def do_async_job_2nd_plan():
    await ...

async def main():
    try:
        await asyncio.wait_for(do_async_job(), timeout=1)
    except asyncio.TimeoutError:
        await asyncio.wait_for(do_async_job_2nd_plan(), timeout=2)


asyncio.run(main())

asyncio.to_thread()

由於 event loop 負責執行非同步(asynchronous)的工作,為了發揮 event loop 最大效能,我們都需要確保每 1 個 coroutine 中都需要有 await 的存在或者想辦法將執行時間很長的部分轉為 coroutine,使得 event loop 能夠有機會切換執行其他工作的機會,否則 event loop 遇到執行特別時間長的程式碼,又沒有 await 能夠讓 event loop 能夠轉為執行其他工作時,就會造成 event loop 阻塞,例如以下範例:

import asyncio
import threading

from time import sleep


def hard_work():
    print('thread id:', threading.get_ident())
    sleep(10)


async def do_async_job():
    hard_work()
    await asyncio.sleep(1)
    print('job done!')


async def main():
    task1 = asyncio.create_task(do_async_job())
    task2 = asyncio.create_task(do_async_job())
    task3 = asyncio.create_task(do_async_job())
    await asyncio.gather(task1, task2, task3)


asyncio.run(main())

上述範例執行結果如下,可以看到 task1, task2, task3 其實都各花 10 秒在 sleep ,同時也阻塞其他工作的進行,因此造成這個範例花費約 30 秒的時間執行:

$ time python test.py
thread id: 4495814080
thread id: 4495814080
thread id: 4495814080
job done!
job done!
job done!
python test.py  0.10s user 0.07s system 0% cpu 31.232 total

為了解決某些耗時執行的程式碼阻塞 event loop 的問題, Python 3.9 提供 asyncio.to_thread() 可以將耗時執行的部分丟至 event loop 以外的另 1 個 thread 中執行,每呼叫 1 次就會在 1 個新的 thread.

因此前述範例改為 await asyncio.to_thread() 之後就可解決 event loop 阻塞的問題:

import asyncio
import threading

from time import sleep


def hard_work():
    print('thread id:', threading.get_ident())
    sleep(10)


async def do_async_job():
    await asyncio.to_thread(hard_work)
    await asyncio.sleep(1)
    print('job done!')


async def main():
    task1 = asyncio.create_task(do_async_job())
    task2 = asyncio.create_task(do_async_job())
    task3 = asyncio.create_task(do_async_job())
    await asyncio.gather(task1, task2, task3)


asyncio.run(main())

上述範例執行結果如下,可以看到執行時間從 30 秒左右降至 10 秒左右,非常大的改善:

$ time python test.py
thread id: 123145336332288
thread id: 123145353121792
thread id: 123145369911296
job done!
job done!
job done!
python test.py  0.10s user 0.06s system 1% cpu 11.244 total

不過 Python 3.8 以下並沒有 asyncio.to_thread() 可以使用,但我們仍然可以利用 event loop 所提供的方法 loop.run_in_executor 達成相同效果。

loop.run_in_executor 能夠結合 concurrent 模組將工作交給其他執行緒或行程(process)執行, loop.run_in_executor 會回傳 Future 物件,所以需要以 await 告訴 event loop 等待其結果。

前述 asyncio.to_thread() 的範例則可以改成以下:

import asyncio
import concurrent
import threading

from time import sleep


def hard_work():
    print('thread id:', threading.get_ident())
    sleep(10)


async def do_async_job(loop, pool):
    await loop.run_in_executor(pool, hard_work)
    await asyncio.sleep(1)
    print('job done!')


async def main():
    loop = asyncio.get_event_loop()
    with concurrent.futures.ThreadPoolExecutor() as pool:
        task1 = asyncio.create_task(do_async_job(loop, pool))
        task2 = asyncio.create_task(do_async_job(loop, pool))
        task3 = asyncio.create_task(do_async_job(loop, pool))
        await asyncio.gather(task1, task2, task3)


asyncio.run(main())

上述範例執行結果如下,可以看到使用 loop.run_in_executor 能夠結合 concurrent 模組也能夠達成與 asyncio.to_thread() 相同的效果:

$ time python test.py
thread id: 123145365553152
thread id: 123145382342656
thread id: 123145399132160
job done!
job done!
job done!
python test.py  0.11s user 0.08s system 1% cpu 11.297 total

順帶一提,如果是 I/O bound 的工作,可以使用 concurrent.futures.ThreadPoolExecutor() 進行處理。

如果是處理 CPU-bound 的工作則可改為 concurrent.futures.ProcessPoolExecutor() .

async with

最後回來看本文一開始 AIOHTTP 範例中出現的 async with 語法。

Python 淺談 with 語句 一文介紹實作 __enter__()__exit__() 2 個方法的的類別,即可稱為 context manager, 並且能夠用 with 語句使用該 context manager, 例如以下就是 1 context manager 的實作:

class Manager(object):
    def __enter__(self):
        print('entering context')

    def __exit__(self, exc_type, exc_value, traceback):
        print('exiting context')


with Manager():
    print('in context')

上述範例執行結果如下:

entering context
in context
exiting context

async with 則是為了 async 版的 context manager 而新增的語句。

async 版的 context manager 則是實作 __aenter__()__aexit__() 2 個方法即可。

不過 __aenter__()__aexit__() 限制開發者一定要回傳 awaitable 物件,如果不回傳 awaitable 物件,將會導致程式無法順利執行,因此在實作 async 版的 context manager 必須注意該限制。

以下是實作 async context manager 的範例:

import asyncio


class AsyncContextManager:
    def __init__(self):
        self.loop = asyncio.get_event_loop()

    async def __aenter__(self):
        print('entering context')
        return self.loop.create_future()

    async def __aexit__(self, exc_type, exc, tb):
        print('exiting context')
        return self.loop.create_future()


async def main():
    async with AsyncContextManager() as acm:
        print('in context')


asyncio.run(main())

上述範例執行結果如下:

entering context
in context
exiting context

async for

Python 中的 iterator 指的是有實作 __iter__()__next__() 2 個方法的物件,因此 iterator 可以用 for 語句走訪。而 async for 則是為了 async 版的 iterator 而新增的語法, async 版的 iterator 則是需要實作 __aiter__()__anext()__ 2 個方法。

以下是實作 async iterator 的範例,可以看到 __anext__() 已經被轉為 coroutine, 因此可以在該 iterator 內透過 event loop 執行非同步的工作,使得我們在執行 for 迴圈時不會阻塞 event loop。

import asyncio


class AsyncCounter(object):

    def __init__(self, stop=None):
        self.count = 0
        self.stop = stop

    def __aiter__(self):
        return self

    async def __anext__(self):
        await asyncio.sleep(1)
        self.count += 1
        if self.stop == self.count:
            raise StopAsyncIteration
        return self.count


async def main():
    async for i in AsyncCounter(11):
        print(i)


asyncio.run(main())

上述範例執行結果如下:

$ python async_iterator.py
1
2
3
4
5
6
7
8
9
10

替換 Event Loop

截至目前為止,我們使用的都是 Python 內建的 event loop ,不過 Python 也有提供替換 event loop 的選項,除了開發者可以實作符合自身需求的 event loop 之外,也可以使用其他人所開發的 event loop 。

知名的 uvloop 是目前經常被用來替代 Python 內建的 event loop 的解決方案。

uvloop 使用的是 libuv 作為其底層操作 asynchronous I/O 的函式庫,該函式庫也是 Node.js 所使用的函式庫,而 uvloop 的官方測試數據表示使用 uvloop 可以讓 Python asyncio 快 2-4 倍!

使用 uvloop 的方法也很簡單,只要安裝 uvloop 套件:

$ pip install uvloop

並將 asyncio.run() 換成 uvloop.run() 即可,例如前述程式碼可以改成下列:

import uvloop
import asyncio


class AsyncCounter(object):

    def __init__(self, stop=None):
        self.count = 0
        self.stop = stop

    def __aiter__(self):
        return self

    async def __anext__(self):
        await asyncio.sleep(1)
        self.count += 1
        if self.stop == self.count:
            raise StopAsyncIteration
        return self.count


async def main():
    async for i in AsyncCounter(11):
        print(i)


uvloop.run(main())

是否超級簡單呢?

延伸閱讀1

如果想更精通 Asyncio 可以參考 O’reilly 所出版的 Using Asyncio in Python: Understanding Python’s Asynchronous Programming Features, 或者由良葛格翻譯的中文版 Python非同步設計:使用Asyncio

也可以觀看影片 David Beazley - Python Concurrency From the Ground Up

另外,看完本文可以接著閱讀分分鐘教你學會使用 Python AIOHTTP ,肯定可以駕輕就熟!

結語

實際上 asyncio 並不難,只要認識幾個重要的概念就能夠輕鬆上手,而且熟悉 asyncio 之後會發現在加速程式效能的解決方案上,有時使用 asyncio 會比起使用 multiprocessing 或 threads 來得更加簡便。

以上, happy coding!

References

https://www.python.org/dev/peps/pep-0492/

https://docs.python.org/3/library/asyncio.html

https://docs.python.org/3/library/asyncio-eventloop.html#executing-code-in-thread-or-process-pools


  1. 感謝 Dboy Laio 提供延伸閱讀 ↩︎

對抗久坐職業傷害

研究指出每天增加 2 小時坐著的時間,會增加大腸癌、心臟疾病、肺癌的風險,也造成肩頸、腰背疼痛等常見問題。

然而對抗這些問題,卻只需要工作時定期休息跟伸展身體即可!

你想輕鬆改變現狀嗎?試試看我們的 PomodoRoll 番茄鐘吧! PomodoRoll 番茄鐘會根據你所設定的專注時間,定期建議你 1 項辦公族適用的伸展運動,幫助你打敗久坐所帶來的傷害!

贊助我們的創作

看完這篇文章了嗎? 休息一下,喝杯咖啡吧!

如果你覺得 MyApollo 有讓你獲得實用的資訊,希望能看到更多的技術分享,邀請你贊助我們一杯咖啡,讓我們有更多的動力與精力繼續提供高品質的文章,感謝你的支持!