Python 內建的 multiprocessing 是相當重要的模組,如果有平行(parallelism)處理的需求,除了內建的 threading 模組之外,另一個就屬 multiprocessing 。

使用 multiprocessing 的好處在於能夠很大程度避開 Python GIL 對於程式效能的影響,但壞處則是對於記憶體的耗用程度也較高,即使如此也是一個必須了解的模組。

本篇將實際透過幾個範例學習如何使用 multiprocessing 模組。

本文環境

  • Python 3.7.0
  • macOS 10.15

multiprocessing.Pool

Pool 是 multiprocessing 模組中相當方便的類別, Pool 提供簡單的方法,讓我們能夠定義 workers 的個數,也就是多少個平行處理的 Processes 數,例如 Pool(4) 則代表會有 4 個平行處理的 Processes 。

以下是使用 Pool 寄信給 99 位使用者的範例程式:

from multiprocessing import Pool


def send_mail(username, coupon_code, extra_sentence):
    print(username, coupon_code, extra_sentence or '')


with Pool(4) as pool:
    for idx in range(1, 100):
        pool.apply_async(
            send_mail,
            (f'user{idx:0>2}', f'vipcode{idx:0>2}'),
            {'extra_sentence': f'{idx:0>4}'}
        )
    pool.close()
    pool.join()

上述範例在第 8 行建立 4 個 workers (也就是 4 個 processes)後,接著在第 9, 10 行的迴圈內將 send_email 函式以及呼叫 send_email 函式所需要的參數以 apply_async 的方式交給 pool 加到佇列(queue) 中等待執行。

接著 ·pool.close() 則是告訴 pool 沒有其他資料與任務需要加到佇列(queue) 中了。注意,此時所有的 Workers 並不會直接開始工作喔,而是直到 pool.join() 被呼叫之後,所有工作才會開始被執行。

以下是執行結果,可以看到結果中的數字並不是連續的,這是由於 Workers 執行速度不ㄧ所導致的,純屬正常現象。這也代表在平行處理的世界中,我們通常無法預測執行與結束的次序,如果你需要保證執行或結束的次序,那麼平行處理可能就不適合你使用。

user01 vipcode01 0001
user02 vipcode02 0002
user03 vipcode03 0003
user05 vipcode05 0005
user06 vipcode06 0006
user07 vipcode07 0007
user04 vipcode04 0004
user08 vipcode08 0008
...

Parent process 分享資料(shared memory) Child process

初體驗 Pool 的功能之後,有個觀念需要認識一下。

一個 Python 程序(假設為 A)如果利用 Pool/Process 產生出其他的程序(假設為 B, C, D)來協同工作,那麼 A 就是 parent process , B, C, D 則是 child processes 。

而 Processes 之間的記憶體基本上是獨立的,如果要溝通就得透過 IPC(Inter-process communication) 技術,常見的 IPC 技術有幾種:

  1. 透過檔案,例如 A Process 將資料寫至一個檔案後由 B Process 進行讀取,達成溝通的效果
  2. Socket ,透過 TCP/UDP 等網路協定進行溝通
  3. Shared memory ,透過共享記憶體區塊進行溝通
  4. Signal ,透過作業系統提供的信號機制進行溝通,例如 A Process 發出信號給 B Process , B Process 針對不同信號做出不同的處理,達成溝通的效果

當然, IPC 技術不只有上述 4 種,其他可以查閱 Inter-process communication - Wikipedia

Python 的 multiprocessing 模組也有提供 Shared memory 的方式讓我們能夠實作 Processes 之間的溝通,例如以下範例中的 Value('d', 0.0) 實際上就是 Share memory 。

該共享記憶體在第 11 行透過 p = Process(target=add_one, args=(num, )) 傳遞給 Child process 使得 Child process 能夠存取共享記憶體,經過 Child process 加 1 後,Parent process 在第 16 行讀取其值後印出 。

p.s. 關於 Value 的更多說明詳見 官方文件

from multiprocessing import Process, Value


def add_one(v):
    v.value += 1


num = Value('d', 0.0)


p = Process(target=add_one, args=(num, ))
p.start()
p.join()


print(num.value)

上述範例的執行結果:

1.0

那如果將上述的範例改成 Pool 形式,同時交由 99 個 Child processes 執行會如何呢?

from multiprocessing import Pool, Value


def add_one(v):
    v.value += 1


num = Value('d', 0.0)


with Pool(4) as pool:
    for _ in range(100):
        pool.apply_async(add_one, args=(num, ), error_callback=lambda e: print(e))
    pool.close()
    pool.join()


print(num.value)

如果執行上述範例,將會出現以下的錯誤,而 num.value 的值更非預期的 99.0 :

...(skip)...
Synchronized objects should only be shared between processes through inheritance
Synchronized objects should only be shared between processes through inheritance
0.0

原因在於共享記憶體 Value 本來就會由 Child processes 共享,在建立 Child processes 時, Value 也會一併被放到 Child processes 中供存取,所以不需要透過參數傳遞。

因此前述出問題的範例可以改成以下形式,將 add_one 改為不需傳遞參數:

from multiprocessing import Pool, Value


num = Value('d', 0.0)


def add_one():
    num.value += 1


with Pool(4) as pool:
    for _ in range(100):
        pool.apply_async(add_one, error_callback=lambda e: print(e))
    pool.close()
    pool.join()


print(num.value)

上述範例執行結果:

96.0

可以發現,上述結果竟然不是 99.0 ,這是由於發生 race condition 的情況,有些 process 讀取到的 n 是未被更新前的值,導致數值不正確。

p.s. 本文不贅述 race condition

如果要修正該錯誤,正確的方法是導入 Lock ,確保同一時間只有 1 個 process 能夠更新 num 的數值。

multiprocessing.Manager

除了自己實作 Lock 的機制外, Python 也提供 multiprocessing.Manager 讓我們輕鬆地使用 Lock ,以解決 race condition 的情況。

Managers provide a way to create data which can be shared between different processes, including sharing over a network between processes running on different machines. A manager object controls a server process which manages shared objects. Other processes can access the shared objects by using proxies.

如果想要提供 processes 之間的共享資料,而且 Manager 還能透過網路方式讓我們進行不同主機之間的共享資料,真的可以多多利用 Manager 。

因此前述總是有機會計算錯誤的 add_one 範例,我們可以利用 Manager 改為以下形式:

from multiprocessing import Pool, Manager


m = Manager()
n = m.Value('d', 0.0)
lock = m.Lock()


def add_one():
    lock.acquire()
    n.value += 1
    lock.release()


with Pool(4) as pool:
    for _ in range(1, 100):
        pool.apply_async(add_one, error_callback=lambda e: print(e))
    pool.close()
    pool.join()


print(n.value)

p.s. 其實 Manager() 是回傳 SyncManager

上述範例在第 4 行建立 Manager 的實例,由它為我們管理 processes 之間的共享資料,接著第 5 行新增 1 個共享資料 num 以及第 6 行的 Lock 。

再將 add_one 改為更新數值前必須先獲得 Lock ,也就是第 10 行。而更新完之後再釋放 Lock ,讓其他 process 也有機會更新數值,也就是第 12 行的部分。

如此,執行後的數值就會我們所預期的 99.0 囉!

以上就是 Python multiprocessing 的簡單說明與範例,有興趣的話可以詳閱 multiprocessing

Happy Coding!

References

multiprocessing — Process-based parallelism