multiprocessing 模組進階篇 - Pipe, Queue, Array, RawArray 以及 Structure 之教學範例

Posted on  Aug 7, 2023  in  Python 程式設計 - 中階  by  Amo Chen  ‐ 5 min read

用範例輕鬆學 Python multiprocessing 模組 一文中提到 4 種 IPC(Inter Process Communication)方法,分別是:

  • 以參數(args)的方式傳遞資料
  • 以共享記憶體(Shared Memory)中的 Value 物件傳遞資料
  • 以 fork 方式傳遞資料, fork 出來的子 process 會繼承父 process 的資源,所以可以存取原本父 process 內的資料
  • 透過 Manager 共享資料, Manager 會在一個稱為 server process 的 process 中管理共享的資料,並且代理其他 process 操作這些共享資料

除了上述幾種方式之外, Python 也有提供其他方式滿足開發者對於 IPC(Inter Process Communication) 的相關需求,本文將額外介紹 Pipe, Queue, Array, RawArray 4 種方式。

本文環境

  • Python 3

Pipe

如果 2 個 Process 之間需要溝通,可以簡單透過 Pipe() 進行, Pipe() 會回傳 2 個 Connection 物件,分別代表管道的 2 端, 2 個 process 各持一端,就能夠進行溝通,如下圖所示 Process A 與 Process B 需要各持 1 個 Connection, 透過該 Connection 發送與接送資料:

以下是 2 個 Processes 使用 Pipe 進行 IPC 的範例:

import os

from multiprocessing import Process, Pipe


def do_job(conn):
    pid = os.getpid()
    while True:
        i = conn.recv()
        if i is None:
           print(f'{pid}| Nothing to do. Exit.', flush=True)
           conn.send(None)
           conn.close()
           break
        print(f'{pid}| Got {i} from the pipe.', flush=True)
        conn.send(i + 5)


if __name__ == '__main__':
    pid = os.getpid()
    conn1, conn2 = Pipe()

    p = Process(target=do_job, args=(conn2, ))
    p.start()

    for i in range(1, 11):
        conn1.send(i)
    conn1.send(None)

    while True:
        result = conn1.recv()
        if result is None:
           conn1.close()
           print(f'{pid}| Close the pipe', flush=True)
           break
        print(f'{pid}| Receive {result} from the pipe', flush=True)

上述範例 conn1, conn2 = Pipe() 取得 2 個 Connection, 並透過 args 的方式將 conn2 指派給子 process, 也就是 p = Process(target=do_job, args=(conn2, )) 接著用 1 個 for 迴圈送了 10 次資料給子 process, 子 process 會在收到資料之後把資料 +5 之後再送回去。

上述範例執行結果如下,可以看到有 2 個不同的 PID 在運作:

7985| Got 1 from the pipe.
7985| Got 2 from the pipe.
7985| Got 3 from the pipe.
7985| Got 4 from the pipe.
7985| Got 5 from the pipe.
7985| Got 6 from the pipe.
7985| Got 7 from the pipe.
7985| Got 8 from the pipe.
7985| Got 9 from the pipe.
7934| Receive 6 from the pipe
7985| Got 10 from the pipe.
7934| Receive 7 from the pipe
7985| Nothing to do. Exit.
7934| Receive 8 from the pipe
7934| Receive 9 from the pipe
7934| Receive 10 from the pipe
7934| Receive 11 from the pipe
7934| Receive 12 from the pipe
7934| Receive 13 from the pipe
7934| Receive 14 from the pipe
7934| Receive 15 from the pipe
7934| Close the pipe

Queue

Queue 是用 Pipe 與 locs/semaphores 實作而成,與 Pipe 不同的是:

  • Queue 可以設定 queue size, 預設為無限制
  • Queue 可以同時給多個 Processes 存取

下列是使用 Queue 進行 IPC 的範例:

import os

from multiprocessing import Process, Queue


def do_job(queue):
    pid = os.getpid()
    while True:
        data = queue.get()
        if data is None:
           print(f'{pid}| Nothing to do. Exit.', flush=True)
           break
        print(f'{pid}| Got {data} from the queue.', flush=True)


if __name__ == '__main__':
    queue = Queue()

    processes = [
        Process(target=do_job, args=(queue, ))
        for _ in range(2)
    ]

    for p in processes:
        p.start()  # 執行 process

    for i in range(1, 21):
        queue.put(i)  # 放入資料

    queue.put(None)
    queue.put(None)

    for p in processes:
        p.join()  # 等待 process 執行結束

上述範例中 queue = Queue() 取得 1 個 Queue, 接著使用 for 迴圈將 Queue 傳遞給 2 個 Processes, 並使用 for 迴圈將資料放到 Queue 之中,這些資料會在 do_job() 函式中被 queue.get() 取得後列印出來。

值得注意的是 queue.get() 預設為阻塞式(block), Process 會等到 get() 取得資料後,才繼續執行,如果不想讓它阻塞 Process 可以改為 get(false) ,如果無法取得資料,將會拋出 queue.Empty 例外(exception)。

p.s. queue.get() 也可以設定 timeout (單位為秒,必須為正整數)。

前述範例執行結果如下,可以看到 2 個 Processes 都能順利從 Queue 中取得資料:

4394| Got 1 from the queue.
4395| Got 2 from the queue
4394| Got 3 from the queue
4395| Got 4 from the queue
4394| Got 5 from the queue
4395| Got 6 from the queue
4394| Got 7 from the queue
4395| Got 8 from the queue
4394| Got 9 from the queue
4395| Got 10 from the queue
4394| Got 11 from the queue
4395| Got 12 from the queue
4394| Got 13 from the queue
4395| Got 14 from the queue
4394| Got 15 from the queue
4395| Got 16 from the queue
4394| Got 17 from the queue
4395| Got 18 from the queue
4394| Got 19 from the queue
4395| Got 20 from the queue
4394| Nothing to do. Exit.
4395| Nothing to do. Exit.

Array

用範例輕鬆學 Python multiprocessing 模組 一文中我們介紹 Value 的使用方法, Value 僅代表 1 個值,如果需要傳遞多個相同型態(type)的資料,則可以利用 Array, Array 與 Value 相同都可以使用 acquire() , release() 等 Lock 的方法,避免 race condition 的問題發生。

下列是使用 Array 進行 IPC 的範例程式:

import os
import random

from multiprocessing import Process, Array


def do_job(arr):
    pid = os.getpid()
    print(f'{pid}| Got the Array')
    for i in range(len(arr)):
        arr.acquire()
        arr[i] = random.randint(0, 100)
        arr.release()


if __name__ == '__main__':
    pid = os.getpid()
    arr = Array('d', [0]*10)

    print(f'{pid}| Pass the Array')
    p = Process(target=do_job, args=(arr, ))
    p.start()
    p.join()

    print(f'{pid}| Print the Array')
    for i in arr:
        print(f'{pid}| {i}')

上述範例中使用 Array('d', [0]*10) 建立 1 個 size 為 10 的整數陣列,並將該陣列以 args 方式傳遞給新建立的 Process, 也就是 Process(target=do_job, args=(arr, )) 該 Process 會將陣列 arr 代入 do_job() 執行,該 Process 會將該陣列填入隨機整數,每次填入時都會呼叫 acquire() 以獲得鎖,填入隨機整數之後,再呼叫 release() 已釋放鎖。

前述範例的執行結果如下,可以看到 2 個不同的 PID 在傳送與接收資料:

10108| Pass the Array
10159| Got the Array
10108| Print the Array
10108| 71.0
10108| 66.0
10108| 78.0
10108| 24.0
10108| 87.0
10108| 0.0
10108| 54.0
10108| 0.0
10108| 50.0
10108| 45.0

RawArray

Note that setting and getting an element is potentially non-atomic – use Array() instead to make sure that access is automatically synchronized using a lock.

RawArray 與 Array 相似,用於 IPC 以傳遞多個相同型態(type)的資料,但是 RawArray 並不保證寫入、讀取具備原子性(atomic), 也沒有 acquire() , release() 等 Lock 相關方法可以呼叫,如果多個 Process 同時存取,很高機率會有 race condition 問題發生。

因此, RawArray 只適合用來傳遞唯讀(read only)的資料。

以下是使用 RawArray 進行 IPC 的範例:

import os
import random

from multiprocessing import Process, RawArray


def do_job(array):
    pid = os.getpid()
    print(f'{pid}| Got the RawArray')
    for i in array:
        print(f'{pid}| {i}')



if __name__ == '__main__':
    pid = os.getpid()
    arr = RawArray('d', [random.randint(0, 100) for _ in range(10)])

    print(f'{pid}| Pass the RawArray')
    p = Process(target=do_job, args=(arr, ))
    p.start()
    p.join()

上述範例中使用 RawArray('d', [random.randint(0, 100) for _ in range(10)]) 建立 1 個 size 為 10 的隨機整數陣列,並在 Process(target=do_job, args=(arr, )) 中傳遞給子 Process 。

前述範例執行結果如下,可以看到 2 個不同的 PID 在傳送與接收資料:

9317| Pass the RawArray
9368| Got the RawArray
9368| 41.0
9368| 51.0
9368| 73.0
9368| 85.0
9368| 8.0
9368| 92.0
9368| 91.0
9368| 6.0
9368| 43.0
9368| 14.0

p.s. Value 也有 1 個相似的 RawValue 可以使用, RawValue 與 RawArray 同樣不保證原子性(atomic)

定義複雜的資料型態 - Structure

目前為止,本文都是定義簡單的資料型態並進行 IPC, 不過有些時候我們需要定義更複雜的資料結構,例如定義 1 個稱為 User 的結構,裡面有 name 與 age 兩種屬性,這時需要使用 ctypes.Structure 定義資料結構,例如:

from ctypes import Structure, c_ushort, c_wchar


class User(Structure):
    _fields_ = [
        ('name', c_wchar * 5),
        ('age', c_ushort),
    ]

只要讓資料型態繼承 ctypes.Structure 並在 _fields_ 定義清楚屬性, _fields_ 是 1 個陣列,每 1 個陣列元素都必須符合 (屬性名稱, 資料型態) tuple 格式,如上述範例所示, ('name', c_wchar * 5) 代表 name 屬性為 5 個字元長度的字串, ('age', c_ushort) 則代表 age 為 unsigned short, 更多關於型態可以參考 Fundamental data types 表格。

定義好資料型態之後,就可以將資料型態作為 Value, RawValue, Array, RawArray 的第 1 個參數,例如 RawArray(User, [('Joe', 18, ), ('Nemo', 9, )]) 代表定義 1 個元素型態為 User 的陣列,其陣列值為 [('Joe', 18, ), ('Nemo', 9, )], Python 會自動按 _fields_ 的順序將 Joe 指派給 name 屬性, 18 指派給 age 屬性,以此類推。

使用 ctypes.Structure 與 RawArray 的完整範例如下:

import os

from multiprocessing import Process, RawArray
from ctypes import Structure, c_ushort, c_wchar


class User(Structure):
    _fields_ = [
        ('name', c_wchar * 5),
        ('age', c_ushort),
    ]


def do_job(arr):
    pid = os.getpid()
    print(f'{pid}| Got the array', flush=True)
    for i in arr:
        print(f'{pid}| Name: {i.name}, Age: {i.age}', flush=True)
    print(f'{pid}| Exit', flush=True)



if __name__ == '__main__':
    arr = RawArray(User, [('Joe', 18, ), ('Nemo', 9, )])
    p = Process(target=do_job, args=(arr, ))
    p.start()
    p.join()

上述範例執行結果如下,可以看到我們能夠在另一個 Process 中讀取到自定義的資料型態:

27208| Got the array
27208| Name: Joe, Age: 18
27208| Name: Nemo, Age: 9
27208| Exit

總結

學會使用各式各樣的 IPC 方法,不同的方法有不同的應用場景,例如 Queue 適合先進先出以及多個 Processes 存取的情境, Pipe 適合僅有 2 個 processes 相互溝通的情況, Array 適合傳遞多個相同型態的資料。

理解方法的差異,才能靈活應付各種不同問題。

以上! Happy Coding!

References

multiprocessing — Process-based parallelism

ctypes — A foreign function library for Python

7 Ways to Share a Numpy Array Between Processes

對抗久坐職業傷害

研究指出每天增加 2 小時坐著的時間,會增加大腸癌、心臟疾病、肺癌的風險,也造成肩頸、腰背疼痛等常見問題。

然而對抗這些問題,卻只需要工作時定期休息跟伸展身體即可!

你想輕鬆改變現狀嗎?試試看我們的 PomodoRoll 番茄鐘吧! PomodoRoll 番茄鐘會根據你所設定的專注時間,定期建議你 1 項辦公族適用的伸展運動,幫助你打敗久坐所帶來的傷害!

贊助我們的創作

看完這篇文章了嗎? 休息一下,喝杯咖啡吧!

如果你覺得 MyApollo 有讓你獲得實用的資訊,希望能看到更多的技術分享,邀請你贊助我們一杯咖啡,讓我們有更多的動力與精力繼續提供高品質的文章,感謝你的支持!