Python aiomultiprocess 套件是如何運作的?

Posted on  Jul 31, 2024  in  Python 程式設計 - 高階  by  Amo Chen  ‐ 5 min read

aiomultiprocess 是 1 個既實用又有趣的套件。

它實用的地方在於能將多個 coroutines 分散到多個 processes 執行(底層使用 multiprocessing 模組),藉此提升 asyncio 的效能。

而有趣的地方在於 multiprocessing 並不是 1 個 asynchronous 模組,而 aiomultiprocess 卻能將 multiprocessing 與 asyncio 整合在一起 ,著實相當有趣!

如果沒有相當的程式設計功力,其實也難以想像 aiomultiprocess 底層是如何實作的。因此本文將研究 aiomultiprocess 套件如何運作,以及如何整合 asyncio 與 multiprocessing 兩個模組的方法,揭開它的神秘面紗!

本文環境

  • Python 3
  • aiomultiprocess
  • aiohttp
$ pip install aiomultiprocess aiohttp

aiomultiprocess 簡介

aiomultiprocess 是 1 個結合 asyncio 與 multiprocessing 2 個模組的套件。

aiomultiprocess 能將多個 coroutines 分散到多個 processes 執行,其運作原理其實是每個 process 各自運作 1 個 event loop 以執行 coroutines。

之所以會有 aiomultiprocess 是因為 asyncio 雖然可以執行多個 tasks,不過它的效能會被 GIL 所限制,結合 multiprocessing 就可以突破 GIL 的限制。

以下是 1 個簡單的 aiomultiprocess 的範例程式碼,該範例會對 urls 中的網址發出 HTTP GET requests,同時這些 requests 會在不同的 queue 中分散給不同的 process 執行:

import asyncio
import os
from aiohttp import request
from aiomultiprocess import Pool

async def get(url):
    pid = os.getpid()
    async with request("GET", url) as response:
        return (pid, url, response.status)

async def main():
    urls = ["https://example.com", "https://example.com"]
    async with Pool(queuecount=2) as pool:
        async for (pid, url, status) in pool.map(get, urls):
            print(f'[{pid}] {url} => {status}')

if __name__ == '__main__':
    asyncio.run(main())

上述程式碼執行結果如下,可以看到我們成功將 coroutine get() 分散到不同的 process 執行:

[44124] https://example.com => 200
[44125] https://example.com => 200
[44124] https://example.com => 200
[44125] https://example.com => 200

此處之所以需要指定 queuecount=2 是因為 aiomultiprocess 預設為所有 processes (預設會建立與 CPU 核心數相同的 processes)共享 1 個 queue,所以只要有 1 個 process 執行的速度特別快,就有可能都是同 1 個 process 執行工作。

queuecount-01.png

因此前述程式碼故意將 queue 設定為 2 個,讓其他 process 也可以接到工作。

queuecount-02.png

接下來,開始研究 aiomultiprocess 是怎麼將 asyncio 與 multiprocessing 結合在一起的。

Pool 是怎麼啟動多個 processes 的?

前述章節,我們已經知道 aiomultiprocess 其實是透過 Queue 將要分散執行的工作分派給 child processes。

那麼,aiomultiprocess 是怎麼啟動多個 processes 的呢?其實就寫在 Pool 類別的 init() 方法內:

def init(self) -> None:
    """
    Create the initial mapping of processes and queues.

    :meta private:
    """
    for _ in range(self.queue_count):
        tx = self.context.Queue()
        rx = self.context.Queue()
        qid = self.scheduler.register_queue(tx)

        self.queues[qid] = (tx, rx)

    qids = list(self.queues.keys())
    for i in range(self.process_count):
        qid = qids[i % self.queue_count]
        self.processes[self.create_worker(qid)] = qid
        self.scheduler.register_process(qid)

從上述程式碼可以看到 self.queue_count,該屬性就是前述章節提到的 queuecount,而 tx , rx 對應的是待執行的工作 queue 與儲存執行結果的 queue,它們是成對的,如果 queuecount=2,那就會有 2 對 txrx,這些 queues 會被放到 self.queues 屬性中,作為 self.scheduler 後續分派工作之用。

接著第 2 個 for 迴圈,就是分配不同的 queue 給不同的 child process 使用,此處可以看到 aiomultiprocess 使用 i % self.queue_count 依序指派 queue 給不同的 child process 使用。

create_worker() 方法如下所示:

def create_worker(self, qid: QueueID) -> Process:
    """
    Create a worker process attached to the given transmit and receive queues.

    :meta private:
    """
    tx, rx = self.queues[qid]
    process = PoolWorker(
        tx,
        rx,
        self.maxtasksperchild,
        self.childconcurrency,
        initializer=self.initializer,
        initargs=self.initargs,
        loop_initializer=self.loop_initializer,
        exception_handler=self.exception_handler,
    )
    process.start()
    return process

可以簡單視為與以下 multiprocessing.Process 範例程式碼等效:

from multiprocessing import Process

p = Process(target=..., args=...)
p.start()

PoolWorker 類別其實就是 multiprocessing.Process 類別的再包裝,它會在初始化之後,就執行它本身的 run() 方法,持續等待接收來自 queue 的工作:

PoolWorker-init.png

run() 方法其實是 1 個 while loop,不斷地從 queue 取得工作、執行、將結果放到儲存結果的 queue 中:

poolworker-run.png

值得注意的是此處都一律使用 get_nowait()put_nowait 是為了避免卡住 event loop,所以必須使用 *_nowait()。( event loop 最怕 long-running non-async task 卡住它,讓它無法切換執行其他 coroutines )

至於每個 child process 的 event loop 是在哪裡建立的呢?

答案是在 PoolWorker 的父類別 — Process 類別(非 multiprocessing.Process 類別)。

而真正負責呼叫 multiprocessing.Process 也是在 Process 類別的 __init__() 方法內:

process-init.png

Process.run_async() 就是負責建立 event loop 的地方:

@staticmethod
def run_async(unit: Unit) -> R:  # type: ignore[type-var]
    """Initialize the child process and event loop, then execute the coroutine."""
    try:
        if unit.loop_initializer is None:
            loop = asyncio.new_event_loop()
        else:
            loop = unit.loop_initializer()

        asyncio.set_event_loop(loop)

        if unit.initializer:
            unit.initializer(*unit.initargs)

        result: R = loop.run_until_complete(unit.target(*unit.args, **unit.kwargs))

        return result

    except BaseException:
        log.exception(f"aio process {os.getpid()} failed")
        raise

這邊就可以看到 loop.run_until_complete(unit.target(*unit.args, **unit.kwargs)) 會讓 process 執行前述的 PoolWorker.run() 方法。所以,真正建立 multiprocessing.Process 物件與 event loop 的都是 PoolWorker 的父類別 Process 類別。

至此,我們已經知道 aiomultiprocess 如何在多個 processes 中啟動 event loop 與接收工作。

aiomultiprocess 是怎麼執行 coroutine 的?

接著看 aiomultiprocess 是如何執行 coroutine 的,舉 Pool.map() 方法為例:

def map(
    self,
    func: Callable[[T], Awaitable[R]],
    iterable: Sequence[T],
    # chunksize: int = None,  # todo: implement chunking maybe
) -> PoolResult[R]:
    """Run a coroutine once for each item in the iterable."""
    if not self.running:
        raise RuntimeError("pool is closed")

    tids = [self.queue_work(func, (item,), {}) for item in iterable]
    return PoolResult(self, tids)

Pool.map() 其實是呼叫 Pool.queue_work() 方法把 coroutine 放到 queue 裡,也就是下列程式碼中的 tx.put_nowait((task_id, func, args, kwargs))

def queue_work(
    self,
    func: Callable[..., Awaitable[R]],
    args: Sequence[Any],
    kwargs: Dict[str, Any],
) -> TaskID:
    """
    Add a new work item to the outgoing queue.

    :meta private:
    """
    self.last_id += 1
    task_id = TaskID(self.last_id)

    qid = self.scheduler.schedule_task(task_id, func, args, kwargs)
    tx, _ = self.queues[qid]
    tx.put_nowait((task_id, func, args, kwargs))
    return task_id

至於 queue 可能有多個存在,所以要使用哪個 queue 是由 scheduler 所決定,預設使用的是 RoundRobin scheduler,輪流分配到每個 queue,在此不多贅述。

aiomultiprocess 是如何回收 coroutines 的執行結果

當我們呼叫 Pool.map() 之後,會得到 1 個 PoolResult 物件,該物件有實作 __aiter__ 方法,所以可以用 async for 的方式走訪回收 coroutines 的執行結果:

def __aiter__(self) -> AsyncIterator[_T]:
    """Return results one-by-one as they are ready"""
    return self.results_generator()

async def results_generator(self) -> AsyncIterator[_T]:
    """Return results one-by-one as they are ready"""
    for task_id in self.task_ids:
        yield (await self.pool.results([task_id]))[0]

這些結果其實存在 Pool 物件的 results() 方法內:

async def results(self, tids: Sequence[TaskID]) -> Sequence[R]:
    """
    Wait for all tasks to complete, and return results, preserving order.

    :meta private:
    """
    pending = set(tids)
    ready: Dict[TaskID, R] = {}

    while pending:
        for tid in pending.copy():
            if tid in self._results:
                result, tb = self._results.pop(tid)
                if tb is not None:
                    raise ProxyException(tb)
                ready[tid] = result
                pending.remove(tid)

        await asyncio.sleep(0.005)

    return [ready[tid] for tid in tids]

而 Pool 會在 loop() 方法中回收 rx queue 中的執行結果:

pool-loop.png

Pool.loop() 方法會在 Pool 物件建立完各個 child processes 之後被呼叫:

pool-init-loop.png

如此一來就打通從 main process 到 child processes 之間的雙向溝通。這就是 aiomulitprocess 大致的運作方式。

做一個簡易版的 aiomultiprocess

知道 aiomultiprocess 的運作方式之後,我們就可以用類似的方式做出類似的功能。

我們同樣用 1 組成對的 queues 與 2 個 child processes 做 asyncio 與 multiprocessing 的整合:

import asyncio
import os
import queue
from multiprocessing import Process, Queue

from aiohttp import request


async def run_async(tx, rx):
    while True:
        try:
            co, args = tx.get_nowait()
            await asyncio.sleep(3) # let the other process to get the next task
        except queue.Empty:
            await asyncio.sleep(1)
        except TypeError:
            break
        else:
            fut = asyncio.ensure_future(co(*args))
            done, _ = await asyncio.wait([fut])
            if done:
                for future in done:
                    rx.put(future.result())

def run(tx, rx):
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run_async(tx, rx))


async def get(url):
    pid = os.getpid()
    async with request("GET", url) as response:
        return (pid, url, response.status)

async def main():
    tx = Queue()
    rx = Queue()

    p1 = Process(target=run, args=(tx, rx))
    p1.start()

    p2 = Process(target=run, args=(tx, rx))
    p2.start()

    urls = ["https://example.com", "https://example.com"]
    for url in urls:
        tx.put((get, (url, )))

    count = 0
    while True:
        try:
            print(rx.get_nowait())
            count += 1
        except queue.Empty:
            pass
        if count == len(urls):
            break
        await asyncio.sleep(1)

    tx.put(None)
    tx.put(None)

if __name__ == '__main__':
    asyncio.run(main())

上述範例程式執行結果如下:

(84553, 'https://example.com', 200)
(84552, 'https://example.com', 200)

基本上,前述程式碼為 aiomultiprocess 的精簡化版本,兩者做的事相差無幾,因此不多加贅述前述程式碼的作用囉!

總結

aiomultiprocess 其實是蠻值得閱讀的 Python 套件,研究這個套件你會學到如何把 asyncio 與 multiprocessing 整合在一起之外,還能提升 asyncio 與 multiprocessing 兩個模組的掌握能力!

以上!

Enjoy!

References

omnilib/aiomultiprocess: Take a modern Python codebase to the next level of performance.

對抗久坐職業傷害

研究指出每天增加 2 小時坐著的時間,會增加大腸癌、心臟疾病、肺癌的風險,也造成肩頸、腰背疼痛等常見問題。

然而對抗這些問題,卻只需要工作時定期休息跟伸展身體即可!

你想輕鬆改變現狀嗎?試試看我們的 PomodoRoll 番茄鐘吧! PomodoRoll 番茄鐘會根據你所設定的專注時間,定期建議你 1 項辦公族適用的伸展運動,幫助你打敗久坐所帶來的傷害!

贊助我們的創作

看完這篇文章了嗎? 休息一下,喝杯咖啡吧!

如果你覺得 MyApollo 有讓你獲得實用的資訊,希望能看到更多的技術分享,邀請你贊助我們一杯咖啡,讓我們有更多的動力與精力繼續提供高品質的文章,感謝你的支持!