用範例輕鬆學 Python multiprocessing 模組

Last updated on  Aug 8, 2023  in  Python 程式設計 - 初階  by  Amo Chen  ‐ 5 min read

Python 內建的 multiprocessing 是相當重要的模組,如果有平行(parallelism)處理的需求,除了內建的 threading 模組之外,另一個就屬 multiprocessing 。

使用 multiprocessing 的好處在於能夠很大程度避開 Python GIL 對於程式效能的影響,但壞處則是對於記憶體的耗用程度也較高,即便如此也是一個必須了解的模組。

本篇將實際透過幾個範例學習如何使用 multiprocessing 模組。

本文環境

  • Python 3.10

multiprocessing.Pool

Pool 是 multiprocessing 模組中相當方便的類別, Pool 提供簡單的方法,讓我們能夠定義 workers 的個數,也就是多少個平行處理的 Processes 數,例如 Pool(4) 則代表會有 4 個平行處理的 Processes 。

以下是使用 Pool 寄信給 100 位使用者的範例程式,假設每封信需要 1 秒寄出,那麼如果不使用平行處理,單以 for 迴圈處理,寄完 100 封信就需要 100 秒;如果以 4 個 workers 平行處理的話,理論上可以在 25 秒(100 / 4)左右處理完,這就是平行處理的威力:

from time import sleep, time
from multiprocessing import Pool


def send_mail(username, coupon_code, extra_sentence):
    print(username, coupon_code, extra_sentence or '')
    sleep(1)  # 模擬寄信需要 1 秒的時間


s_time = time()
with Pool(4) as pool:
    for idx in range(100):
        pool.apply_async(
            send_mail,
            (f'user{idx:0>2}', f'vipcode{idx:0>2}'),
            {'extra_sentence': f'{idx:0>4}'}
        )
    pool.close()
    pool.join()

print('花費時間:', time() - s_time)

上述範例在第 11 行建立 4 個 workers (也就是 4 個 processes)後,接著在第 12, 13 行的迴圈內將 send_email 函式以及呼叫 send_email 函式所需要的參數以 apply_async 的方式交給 pool 加到佇列(queue) 中等待執行。

接著 pool.close() 則是告訴 pool 沒有其他資料與任務需要加到佇列(queue) 中了。注意,此時所有的 Workers 並不會直接開始工作喔,而是直到 pool.join() 被呼叫之後,所有工作才會開始被執行。

以下是執行結果,可以看到花費時間約 26 秒,接近我們的預估值。

user01 vipcode01 0001
user02 vipcode02 0002
user03 vipcode03 0003
user05 vipcode05 0005
user06 vipcode06 0006
user07 vipcode07 0007
user04 vipcode04 0004
user08 vipcode08 0008
...
花費時間: 26.362775325775146

另外值得注意的是,上述結果中的數字並不是連續的,這是由於 Workers 執行速度不ㄧ所導致的,純屬正常現象,這也代表在平行處理的世界中,我們通常無法預測執行與結束的次序,如果你需要 100% 保證執行與結束的次序,那麼平行處理可能就不適合你使用。

Parent process 分享資料(shared memory) Child process

初體驗 Pool 的功能之後,有個觀念需要認識一下。

一個 Python 程序(假設為 A)如果利用 Pool/Process 產生出其他的程序(假設為 B, C, D)來協同工作,那麼 A 就是 parent process , B, C, D 則是 child processes 。

而 Processes 之間的記憶體基本上是獨立的,如果要溝通就得透過 IPC(Inter-process communication) 技術。

常見的 IPC 技術有 4 種:

  1. 透過檔案,例如 A Process 將資料寫至一個檔案後由 B Process 進行讀取,達成溝通的效果
  2. Socket ,透過 TCP/UDP 等網路協定進行溝通
  3. Shared memory ,透過共享記憶體區塊進行溝通
  4. Signal ,透過作業系統提供的信號機制進行溝通,例如 A Process 發出信號給 B Process , B Process 針對不同信號做出不同的處理,達成溝通的效果

當然, IPC 技術不只有上述 4 種,其他可以查閱 Inter-process communication - Wikipedia

Python 的 multiprocessing 模組也有提供 Shared memory 的方式讓我們能夠實作 Processes 之間的溝通,例如以下範例中的 Value('d', 0.0) 實際上就是共享記憶體(shared memory),該共享記憶體在第 11 行透過 p = Process(target=add_one, args=(num, )) 傳遞給 child process 使得 child process 能夠存取共享記憶體,經過 child process 加 1 後, parent process 在第 16 行讀取其值後印出 。

p.s. 關於 Value 的更多說明詳見 官方文件

from multiprocessing import Process, Value


def add_one(v):
    v.value += 1


num = Value('d', 0.0)


p = Process(target=add_one, args=(num, ))
p.start()
p.join()


print(num.value)

上述範例的執行結果:

1.0

那如果將上述的範例改成 Pool 形式,同時交由 100 個 child processes 執行會如何呢?

from multiprocessing import Pool, Value


def add_one(v):
    v.value += 1


num = Value('d', 0.0)


with Pool(4) as pool:
    for _ in range(100):
        pool.apply_async(add_one, args=(num, ), error_callback=lambda e: print(e))
    pool.close()
    pool.join()


print(num.value)

如果執行上述範例,將會出現以下的錯誤,而 num.value 的值也非預期的 100.0:

...(skip)...
Synchronized objects should only be shared between processes through inheritance
Synchronized objects should only be shared between processes through inheritance
0.0

原因在於 Python 啟動 1 個新的 child process 時會因為作業系統的不同,有不同的預設方法,這個被稱為 start method

目前有 3 種 start methods:

  1. spawn Unix 與 Windows 系統都支援, macOS 與 Windows 預設都使用這個方法啟動新的 child process 。簡單來說,這個方法會啟動 1 個幾乎新的 child process, 可以簡單理解為記憶體內的資料也是新的。
  2. fork 只有 Unix 系統支援,也是 Unux 預設支援的方法。這個方法會啟動 1 個與 parent process 相同的 child process, parent process 中的資源也會被新啟動的 child process 繼承(inherit)。
  3. forkserver 也是 Unix 系統支援的方法。基本上跟 fork 運作類似,不過 parent process 與 child process 之間多了 1 個 fork server 幫忙代理 fork child process 相關的事務。

如果想知道自己的 start method 是什麼,可以執行下列程式:

import multiprocessing
multiprocessing.get_start_method()

本文的 start method 為 fork, 如果你使用的是 Windows 系統後續的範例將會有問題,建議可以使用 Google Colab 執行,其他系統可以呼叫 multiprocessing.set_start_method('fork') 設定 start method 為 fork 。

使用 fork 的方法時,共享記憶體 Value 本來就會由 child processes 繼承,因此在建立 child processes 時, Value 也會一併被 child processes 繼承,所以不需要透過參數傳遞。

因此前述出問題的範例可以改成以下形式,將 add_one 改為不需傳遞參數:

from multiprocessing import Pool, Value


num = Value('d', 0.0)


def add_one():
    num.value += 1


with Pool(4) as pool:
    for _ in range(100):
        pool.apply_async(add_one, error_callback=lambda e: print(e))
    pool.close()
    pool.join()


print(num.value)

上述範例執行結果:

96.0

可以發現,上述結果竟然不是 100.0 ,這是由於發生 race condition 的情況,有些 process 讀取到的 n 是未被更新前的值,導致數值不正確。

p.s. 本文不贅述 race condition

如果要修正該錯誤,正確的方法是導入 Lock ,確保同一時間只有 1 個 process 能夠更新 num 的數值。

Python 的 multiprocessing.Value 有實作 lock, 所以改成下列範例修正 race condition 的問題,可以需要額外透過 with num.get_lock() 取得 lock 才能更改 num 的值:

from multiprocessing import Pool, Value, get_start_method, set_start_method



def add_one():
    with num.get_lock():
        num.value += 1


num = Value('d', 0.0)


with Pool(4) as pool:
    for _ in range(100):
        pool.apply_async(add_one, error_callback=lambda e: print(e))
    pool.close()
    pool.join()


print(num.value)

multiprocessing.Manager

前述範例的 lock 機制之外, Python 也提供 multiprocessing.Manager 讓我們輕鬆地使用 lock ,以解決 race condition 的情況。

Managers provide a way to create data which can be shared between different processes, including sharing over a network between processes running on different machines. A manager object controls a server process which manages shared objects. Other processes can access the shared objects by using proxies.

Manager 除了提供 processes 之間的共享資料,還提供能透過網路方式讓我們進行不同主機之間的共享資料的方法,有需要的話,真的可以多多利用 Manager 。

前述總是有機會計算錯誤的 add_one 範例,我們可以利用 Manager 改為以下形式:

from multiprocessing import Pool, Manager


m = Manager()
n = m.Value('d', 0.0)
lock = m.Lock()


def add_one():
    lock.acquire()
    n.value += 1
    lock.release()


with Pool(4) as pool:
    for _ in range(100):
        pool.apply_async(add_one, error_callback=lambda e: print(e))
    pool.close()
    pool.join()


print(n.value)

p.s. 其實 Manager() 是回傳 SyncManager

上述範例在第 4 行建立 Manager 的實例,由它為我們管理 processes 之間的共享資料,接著第 5 行新增 1 個共享資料 n 以及第 6 行的 lock 。

n 更新數值前必須先獲得 lock ,也就是第 10 行的部分。而更新完之後再釋放 lock ,讓其他 process 也有機會更新數值,也就是第 12 行的部分。

如此,執行後的數值就會我們所預期的 100.0 囉!

總結

以上就是 Python multiprocessing 的簡單說明與範例。本文謹作為初學指引,讓初學者能夠透過幾個簡單的範例體驗 multiprocessing 的奇妙之處,以理解何謂「平行」處理的概念,實際工作上用到 multiprocessing 的機會其實也不小,有興趣的話可以閱讀 multiprocessing 模組進階篇 或者詳閱 multiprocessing 官方文件,學習更多關於 multiprocessing 的相關知識。

Happy Coding!

References

multiprocessing — Process-based parallelism

對抗久坐職業傷害

研究指出每天增加 2 小時坐著的時間,會增加大腸癌、心臟疾病、肺癌的風險,也造成肩頸、腰背疼痛等常見問題。

然而對抗這些問題,卻只需要工作時定期休息跟伸展身體即可!

你想輕鬆改變現狀嗎?試試看我們的 PomodoRoll 番茄鐘吧! PomodoRoll 番茄鐘會根據你所設定的專注時間,定期建議你 1 項辦公族適用的伸展運動,幫助你打敗久坐所帶來的傷害!

追蹤新知

看完這篇文章了嗎?還意猶未盡的話,追蹤粉絲專頁吧!

我們每天至少分享 1 篇文章/新聞或者實用的軟體/工具,讓你輕鬆增廣見聞提升專業能力!如果你喜歡我們的文章,或是想了解更多特定主題的教學,歡迎到我們的粉絲專頁按讚、留言讓我們知道。你的鼓勵,是我們的原力!

贊助我們的創作

看完這篇文章了嗎? 休息一下,喝杯咖啡吧!

如果你覺得 MyApollo 有讓你獲得實用的資訊,希望能看到更多的技術分享,邀請你贊助我們一杯咖啡,讓我們有更多的動力與精力繼續提供高品質的文章,感謝你的支持!