Python multiprocessing 模組進階說明與範例

Posted on  May 1, 2021  in  Python 程式設計 - 中階  by  Amo Chen  ‐ 11 min read

先前 Python multiprocessing 模組簡單說明與範例 一文簡述 multiprocessing 模組中 Pool, Manager 的使用方法,也講述如何在多個 Processes 之間共享資料,不過仍有許多關於 multiprocessing 中的功能尚未詳述,例如 Process 類別(class), Queue, Pipe 等等,這些功能不僅重要也很實用,是邁向 Python 高階功能的必學項目,因此不可忽視。

本文將透過實際範例了解更多關於 multiprocessing 模組中實用的功能。

本文環境

  • Python 3.7

Process 類別(class)

實際上,絕大多數需要平行處理的情況,都可以透過將平行處理的部分轉成函式(function)後,交由 Pool 進行執行即可。

不過,仍有少數情況不適合用 Pool 執行,例如我們只需要 1 個 process 在背景執行一些工作達成類似非同步(asynchronous)執行的效果。

此時就可以考慮使用 Process 類別(class)

只要繼承該類別並且實作 run() 方法(method)即可將需要在背景執行的工作放在另 1 個 process 中執行,例如以下範例實作 1 個能夠每 3 秒監控某網站的 process ,並且在監控 5 次後結束執行:

import requests

from multiprocessing import Process, current_process


class ChildProcess(Process):
    def run(self):
        p = current_process()
        print("New process -> [%s] %s" % (p.pid, p.name))
        for _ in range(5):
            resp = requests.get('https://example.com')
            print('example.com [%d]' % resp.status_code) 
            time.sleep(3)
        print("[%s] %s terminated" % (p.pid, p.name))

透過繼承 Process 類別實作完 ChildProcess 類別之後,ChildProcess 就具有在不同 process 中執行的能力,當我們呼叫 start() 方法之後, ChildProcess 類別就會在另 1 個 process 中執行。

例如以下範例,我們在 22 行實例化該類別,接著在第 23 行呼叫 start() 方法,此時就會讓 ChildProcess 在另外的 process 中執行 run() 方法,同時為了證明 ChildProcess 不會阻礙 main process 的執行,我們在第 25 - 27 行執行其他工作:

import time

import requests

from multiprocessing import Process, current_process


class ChildProcess(Process):
    def run(self):
        p = current_process()
        print("New process -> [%s] %s" % (p.pid, p.name))
        for _ in range(5):
            resp = requests.get('https://example.com')
            print('example.com [%d]' % resp.status_code) 
            time.sleep(3)
        print("[%s] %s terminated" % (p.pid, p.name))


parent_p = current_process()
print("I'm main process -> [%s] %s" % (parent_p.pid, parent_p.name))

child_p = ChildProcess()
child_p.start()

for _ in range(5):
    print("I'm main process, doing a job now")
    time.sleep(2)

print("[%s] %s terminated" % (parent_p.pid, parent_p.name))

上述範例的執行結果如下,可以看到 2 個不同 PID 的 processes 各自執行其工作,而且互不影響,其中 main process 51090 執行完其工作後就先結束,直到 child process 51106 執行完監控網站的任務後,整個 python process 才結束執行:

$ python test.py
I'm main process -> [51090] MainProcess
I'm main process, doing a job now
New process -> [51106] ChildProcess-1
example.com [200]
I'm main process, doing a job now
I'm main process, doing a job now
example.com [200]
I'm main process, doing a job now
example.com [200]
I'm main process, doing a job now
[51090] MainProcess terminated
example.com [200]
example.com [200]
[51106] ChildProcess-1 terminated

以上是如何透過繼承 Process 類別實作可以在另 1 個 process 執行工作的介紹。

join() 方法

前述章節中提到 main process 比 child process 先結束執行的情況,如果有些情況是需要先等 child process 執行完的情況,那麼可以額外針對 child process 呼叫 join() 方法,如此一來 main process 就會先停在呼叫 join() 的地方等待,直到 child process 結束後才繼續執行 main process, 例如以下範例:

import time

from multiprocessing import Process, current_process


class ChildProcess(Process):
    def run(self):
        p = current_process()
        print("New process -> [%s] %s" % (p.pid, p.name))
        time.sleep(10)
        print("[%s] %s terminated" % (p.pid, p.name))


parent_p = current_process()
print("I'm main process -> [%s] %s" % (parent_p.pid, parent_p.name))

child_p = ChildProcess()
child_p.start()
child_p.join()

print("[%s] %s terminated" % (parent_p.pid, parent_p.name))

上述範例執行結果如下,可以發現不管執行幾次, MainProcess 總是會等 child process 結束後才結束:

$ python test.py
I'm main process -> [13420] MainProcess
New process -> [13436] ChildProcess-1
[13436] ChildProcess-1 terminated
[13420] MainProcess terminated

如果把 child_p.join() 刪掉,就會發現 MainProcess 總是先結束執行。

以上就是 join() 方法的功效。

Daemon flag

先前 Python daemon thread 解說 一文提到所謂的 daemon thread, 代表 main process 是否需要等 thread 都結束執行之後才能結束執行,同樣地, process 也有 daemon process 。

預設 main process 會等 child process 執行結束才結束 main process, 這也是為何不管有沒有呼叫 join() , main process 總是在所有 child processes 都結束執行後才結束執行。

如果希望 main process 不用等 child process 直接結束所有 processes 的話,只要將 daemon 設定為 True 即可,例如以下範例第 18 行將 daemon 設定為 True :

import time

from multiprocessing import Process, current_process


class ChildProcess(Process):
    def run(self):
        p = current_process()
        print("New process -> [%s] %s" % (p.pid, p.name))
        time.sleep(10)
        print("[%s] %s terminated" % (p.pid, p.name))


parent_p = current_process()
print("I'm main process -> [%s] %s" % (parent_p.pid, parent_p.name))

child_p = ChildProcess(daemon=True)
child_p.start()

time.sleep(3)
print("[%s] %s terminated" % (parent_p.pid, parent_p.name))

p.s. 上述範例並未呼叫 join() 方法

上述執行結果如下,可以發現 main process 一結束執行,child process 也被強制結束執行:

$ python test.py
I'm main process -> [51895] MainProcess
New process -> [51911] ChildProcess-1
[51895] MainProcess terminated

值得注意的是在有明確呼叫 join() 方法的情況下,即使將 daemon 設定為 True , main process 仍會等待 child process 執行結束後才結束:

import time

from multiprocessing import Process, current_process


class ChildProcess(Process):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def run(self):
        p = current_process()
        print("New process -> [%s] %s" % (p.pid, p.name))
        time.sleep(10)
        print("[%s] %s terminated" % (p.pid, p.name))


parent_p = current_process()
print("I'm main process -> [%s] %s" % (parent_p.pid, parent_p.name))

child_p = ChildProcess(daemon=True)
child_p.start()
child_p.join()

print("[%s] %s terminated" % (parent_p.pid, parent_p.name))

上述範例執行結果如下,可以發現 main process 確實等待 child process 結束後才結束。

$ python test.py
I'm main process -> [14612] MainProcess
New process -> [14628] ChildProcess-1
[14628] ChildProcess-1 terminated
[14612] MainProcess terminated

多個 Processes

理解如何透過繼承 Process 類別實作創造新的 process 之後,那麼同時創造多個 process 也就不難了,例如以下範例以 for 迴圈創造 3 個 processes:

import time

from multiprocessing import Process, current_process


class ChildProcess(Process):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def run(self):
        p = current_process()
        print("New process -> [%s] %s" % (p.pid, p.name))
        time.sleep(10)
        print("[%s] %s terminated" % (p.pid, p.name))


parent_p = current_process()
print("I'm main process -> [%s] %s" % (parent_p.pid, parent_p.name))

processes = []
for _ in range(3):
    child_p = ChildProcess()
    child_p.start()
    processes.append(child_p)

print("[%s] %s terminated" % (parent_p.pid, parent_p.name))

上述範例執行結果如下,可以發現多了 3 個 child processes 的執行紀錄:

$ python test.py
I'm main process -> [15809] MainProcess
New process -> [15825] ChildProcess-1
[15809] MainProcess terminated
New process -> [15826] ChildProcess-2
New process -> [15827] ChildProcess-3
[15825] ChildProcess-1 terminated
[15826] ChildProcess-2 terminated
[15827] ChildProcess-3 terminated

封裝複雜邏輯

相較於簡便易用的 Pool , Process 類別的用法更適合將複雜的邏輯封裝在 class 內,使得該 process 受益於 class 所帶來的好處,不僅能夠擁有 class 內的屬性能夠儲存狀態之外,也可以針對需求開發執行多種行為,而不僅僅只是局限於函式的寫法,例如以下範例讓 ChildProcess 能夠藉由傳參數的方式,並將該參數儲存於 class 內的屬性 self.config , 並藉此改變 ChildProcess run() 方法的行為:

import time

from multiprocessing import Process, current_process


class ChildProcess(Process):
    def __init__(self, *args, **kwargs):
        self.config = kwargs.pop('config', {})
        super().__init__(*args, **kwargs)

    def run(self):
        p = current_process()
        print("New process -> [%s] %s" % (p.pid, p.name))
        sleep = self.config.get('sleep', 0)
        print("[%s] %s will sleep for %f seconds" % (p.pid, p.name, sleep))
        time.sleep(sleep)
        print("[%s] %s terminated" % (p.pid, p.name))


parent_p = current_process()
print("I'm main process -> [%s] %s" % (parent_p.pid, parent_p.name))

processes = []
for s in range(3):
    child_p = ChildProcess(config={'sleep': s})
    child_p.start()
    processes.append(child_p)

print("[%s] %s terminated" % (parent_p.pid, parent_p.name))

上述範例執行結果如下,可以發現每個 child process 的行為都因為 config 的設定而有不同,分別 sleep 了 0 至 2 秒:

$ python test.py
I'm main process -> [18414] MainProcess
New process -> [18430] ChildProcess-1
[18430] ChildProcess-1 will sleep for 0.000000 seconds
[18430] ChildProcess-1 terminated
[18414] MainProcess terminated
New process -> [18431] ChildProcess-2
[18431] ChildProcess-2 will sleep for 1.000000 seconds
New process -> [18432] ChildProcess-3
[18432] ChildProcess-3 will sleep for 2.000000 seconds
[18431] ChildProcess-2 terminated
[18432] ChildProcess-3 terminated

Processes 之間的溝通

談到多 processes, 不免要談到 processes 之間的溝通方式,目前 multiprocessing 模組提供 2 種 processes 之間的溝通方式

  • Queues
  • Pipe

使用妥善的話,將能夠讓 processes 之間能夠具有協作的能力,就像工廠作業流水線一樣合力完成一件複雜的事情。

Queues

如果是單向溝通,例如從 main process 將資料傳至 child process 中進行處理的話,可以利用 Queue, 例如以下範例於第 26 行建立 1 個 Queue, 接著於 29 行將其以傳參數的方式交給 child processes, 並將其儲存於 self.queue 內,因此每個 child processes 都能在 run(self) 內透過呼叫 self.queue.get() 持續取得來自 main process 的資料,最後為了讓每個 child process 能夠結束執行,因此第 37 行將相對應 child process 數量的 None 送至 Queue 中,使得每個 child process 能夠收到 None 後結束 while 迴圈:

import time

from multiprocessing import Process, Queue, current_process


class ChildProcess(Process):
    def __init__(self, *args, **kwargs):
        self.queue = kwargs.pop('queue')
        super().__init__(*args, **kwargs)

    def run(self):
        p = current_process()
        print("New process -> [%s] %s" % (p.pid, p.name))
        while True:
            job = self.queue.get()
            if job is None:
                print("[%s] %s got None" % (p.pid, p.name))
                break
            print("[%s] %s got a job %d" % (p.pid, p.name, job))
        print("[%s] %s terminated" % (p.pid, p.name))


parent_p = current_process()
print("I'm main process -> [%s] %s" % (parent_p.pid, parent_p.name))

queue = Queue()
processes = []
processes_n = 3
for s in range(processes_n):
    child_p = ChildProcess(queue=queue)
    child_p.start()
    processes.append(child_p)

for n in range(10):
    queue.put(n)

for _ in range(processes_n):
    queue.put(None)

print("[%s] %s terminated" % (parent_p.pid, parent_p.name))

p.s. 同一時間只有一個 child process 能夠取得資料,因此不會有不同 processes 取得同一份資料的情況

上述範例執行結果如下,可以看到 main process 順利透過 Queue 將資料傳送至 child processes, 而且每個 process 都沒有得到重複的資料,值得注意的是每個 process 能夠從 Queue 取得的資料數目是不一定的,完全取決於每個 child process 的執行速度,從結果可以發現所有的資料都被 ChildProcess-1ChildProcess-2 取完,因此 ChildProcess-3 沒有機會拿到資料就結束執行了:

$ python test.py
I'm main process -> [24791] MainProcess
New process -> [24807] ChildProcess-1
New process -> [24808] ChildProcess-2
[24807] ChildProcess-1 got a job 0
[24791] MainProcess terminated
[24807] ChildProcess-1 got a job 2
[24807] ChildProcess-1 got a job 3
[24808] ChildProcess-2 got a job 1
[24807] ChildProcess-1 got a job 4
[24808] ChildProcess-2 got a job 5
[24807] ChildProcess-1 got a job 6
[24808] ChildProcess-2 got a job 7
[24807] ChildProcess-1 got a job 8
[24808] ChildProcess-2 got a job 9
[24807] ChildProcess-1 got None
[24807] ChildProcess-1 terminated
[24808] ChildProcess-2 got None
[24808] ChildProcess-2 terminated
New process -> [24809] ChildProcess-3
[24809] ChildProcess-3 got None
[24809] ChildProcess-3 terminated

學會透過 Queue 單向溝通之後,如果要進一步達成雙向溝通,其實只要再新增 1 個 Queue 讓 child processes 可以將資料放在另一個 Queue 回傳即可。

例如以下範例,第 31, 32 行創造 2 個 Queue 作為雙向溝通之用,並將該 2 個 Queue 傳入 ChildProcess 使其具有透過 in_queue 取得資料,透過 out_queue 將資料傳出去的能力,使得 main process 能夠在第 46, 47 行透過迴圈及 out_queue.get() 回收資料:

import time

from multiprocessing import Process, Queue, current_process


class ChildProcess(Process):
    def __init__(self, *args, **kwargs):
        self.in_queue = kwargs.pop('in_queue')
        self.out_queue = kwargs.pop('out_queue')
        super().__init__(*args, **kwargs)

    def square(self, n):
        return n**2

    def run(self):
        p = current_process()
        print("New process -> [%s] %s" % (p.pid, p.name))
        while True:
            job = self.in_queue.get()
            if job is None:
                print("[%s] %s got None" % (p.pid, p.name))
                break
            print("[%s] %s got a job %d" % (p.pid, p.name, job))
            self.out_queue.put(self.square(job))
        print("[%s] %s terminated" % (p.pid, p.name))


parent_p = current_process()
print("I'm main process -> [%s] %s" % (parent_p.pid, parent_p.name))

in_queue = Queue()
out_queue = Queue()
processes = []
processes_n = 3
for s in range(processes_n):
    child_p = ChildProcess(in_queue=in_queue, out_queue=out_queue)
    child_p.start()
    processes.append(child_p)

for n in range(10):
    in_queue.put(n)

for _ in range(processes_n):
    in_queue.put(None)

for _ in range(10):
    print("Result: %d" % out_queue.get())

print("[%s] %s terminated" % (parent_p.pid, parent_p.name))

上述範例執行結果如下:

I'm main process -> [25841] MainProcess
New process -> [25857] ChildProcess-1
New process -> [25858] ChildProcess-2
[25857] ChildProcess-1 got a job 0
[25858] ChildProcess-2 got a job 1
[25857] ChildProcess-1 got a job 2
Result: 0
[25858] ChildProcess-2 got a job 3
Result: 1
[25857] ChildProcess-1 got a job 4
New process -> [25859] ChildProcess-3
Result: 9
[25858] ChildProcess-2 got a job 5
Result: 4
Result: 16
[25857] ChildProcess-1 got a job 6
Result: 25
Result: 36
[25859] ChildProcess-3 got a job 7
[25858] ChildProcess-2 got a job 8
[25857] ChildProcess-1 got a job 9
Result: 64
[25858] ChildProcess-2 got None
[25858] ChildProcess-2 terminated
Result: 81
[25857] ChildProcess-1 got None
[25857] ChildProcess-1 terminated
Result: 49
[25859] ChildProcess-3 got None
[25841] MainProcess terminated
[25859] ChildProcess-3 terminated

Pipes

另 1 個 processes 之間溝通的方式為 Pipe (或稱為管道),有別於 Queue 單向溝通的限制,Pipe 具有雙向溝通的能力,當我們呼叫 Pipe() 時會回傳 2 個 Connection ,代表一個管道的 2 端,可以理解為一條水管的 2 端開口,2 個開口都具有傳送與接收資料的能力(稱為 duplex ),因此 main process 與 child process 可以透過這個管道進行雙向溝通。

例如以下範例程式,第 21 行建立 1 個管道,我們將 2 個 connections 分別命名為 parent_connchild_conn 代表管道一端是 main process, 另一端是 child process, 接著將 10 透過 parent_conn 送進管道內,所以要取得這筆資料就得在 child_conn 接收才行,也就是第 12 行接收資料的部分,再來將資料運算完之後,同樣透過 child_conn 將資料送進管道,因此要接收結果就得在 parent_conn 那端接收,也就是第 27 行的部分:

from multiprocessing import Process, Pipe, current_process


class ChildProcess(Process):
    def __init__(self, *args, **kwargs):
        self.child_conn = kwargs.pop('child_conn', None)
        super().__init__(*args, **kwargs)

    def run(self):
        p = current_process()
        print("New process -> [%s] %s" % (p.pid, p.name))
        job = self.child_conn.recv()
        print("[%s] %s got a job %d" % (p.pid, p.name, job))
        self.child_conn.send(job * 2)
        print("[%s] %s terminated" % (p.pid, p.name))


parent_p = current_process()
print("I'm main process -> [%s] %s" % (parent_p.pid, parent_p.name))

parent_conn, child_conn = Pipe()
parent_conn.send(10)
print('send 10 to pipe')

child_p = ChildProcess(child_conn=child_conn)
child_p.start()
print('Result:', parent_conn.recv())

print("[%s] %s terminated" % (parent_p.pid, parent_p.name))

上述範例執行結果如下:

$ python test.py
I'm main process -> [57958] MainProcess
send 10 to pipe
New process -> [57974] ChildProcess-1
[57974] ChildProcess-1 got a job 10
[57974] ChildProcess-1 terminated
Result: 20
[57958] MainProcess terminated

了解 Pipe 基本概念之後,前述用 2 個 Queue 進行雙向溝通的範例就能夠進一步改成以下形式,在第 36 行分別為每個 child process 建立管道,在第 41 - 43 行將資料平均傳入管道,接著在第 45 - 47 行回收結果,最後在第 49 - 51 行告訴 child processes 結束執行:

import time

from multiprocessing import Process, Pipe, current_process


class ChildProcess(Process):
    def __init__(self, *args, **kwargs):
        self.child_conn = kwargs.pop('child_conn', None)
        super().__init__(*args, **kwargs)

    def square(self, n):
        return n**2

    def run(self):
        p = current_process()
        print("New process -> [%s] %s" % (p.pid, p.name))
        while True:
            try:
                job = self.child_conn.recv()
            except EOFError:
                continue
            if job is None:
                print("[%s] %s got None" % (p.pid, p.name))
                break
            print("[%s] %s got a job %d" % (p.pid, p.name, job))
            self.child_conn.send(self.square(job))
        print("[%s] %s terminated" % (p.pid, p.name))


parent_p = current_process()
print("I'm main process -> [%s] %s" % (parent_p.pid, parent_p.name))

processes = []
processes_n = 3
for s in range(processes_n):
    parent_conn, child_conn = Pipe()
    child_p = ChildProcess(child_conn=child_conn)
    child_p.start()
    processes.append((parent_conn, child_conn, child_p))

for n in range(10):
    parent_conn, _, _ = processes[n % processes_n]
    parent_conn.send(n)

for n in range(10):
    parent_conn, _, _ = processes[n % processes_n]
    print('Result:', parent_conn.recv())

for n in range(processes_n):
    parent_conn, _, _ = processes[n % processes_n]
    parent_conn.send(None)

print("[%s] %s terminated" % (parent_p.pid, parent_p.name))

上述範例結果如下:

$ python test.py
I'm main process -> [58083] MainProcess
New process -> [58099] ChildProcess-1
[58099] ChildProcess-1 got a job 0
[58099] ChildProcess-1 got a job 3
[58099] ChildProcess-1 got a job 6
[58099] ChildProcess-1 got a job 9
New process -> [58100] ChildProcess-2
Result: 0
[58100] ChildProcess-2 got a job 1
[58100] ChildProcess-2 got a job 4
[58100] ChildProcess-2 got a job 7
Result: 1
New process -> [58101] ChildProcess-3
[58101] ChildProcess-3 got a job 2
[58101] ChildProcess-3 got a job 5
Result: 4
[58101] ChildProcess-3 got a job 8
Result: 9
Result: 16
Result: 25
Result: 36
Result: 49
Result: 64
Result: 81
[58100] ChildProcess-2 got None
[58099] ChildProcess-1 got None
[58083] MainProcess terminated
[58099] ChildProcess-1 terminated
[58100] ChildProcess-2 terminated
[58101] ChildProcess-3 got None
[58101] ChildProcess-3 terminated

此處值得注意的是必須為每個 child process 各自建立 1 個管道,不像使用 Queue 共享即可,其原因在於共用管道的情況下,如果遇到同一時間有多個 process 都在對管道的同ㄧ端傳送資料的話,將會造成管道內的資料毀損:

Note that data in a pipe may become corrupted if two processes (or threads) try to read from or write to the same end of the pipe at the same time. Of course there is no risk of corruption from processes using different ends of the pipe at the same time.

如果想體驗資料毀損的情況,可以執行以下範例:

import time

from multiprocessing import Process, Pipe, current_process


class ChildProcess(Process):
    def __init__(self, *args, **kwargs):
        self.child_conn = kwargs.pop('child_conn', None)
        super().__init__(*args, **kwargs)

    def square(self, n):
        return n**2

    def run(self):
        p = current_process()
        print("New process -> [%s] %s" % (p.pid, p.name))
        while True:
            try:
                job = self.child_conn.recv()
            except EOFError:
                continue
            if job is None:
                print("[%s] %s got None" % (p.pid, p.name))
                break
            print("[%s] %s got a job %d" % (p.pid, p.name, job))
            self.child_conn.send(self.square(job))
        print("[%s] %s terminated" % (p.pid, p.name))


parent_p = current_process()
print("I'm main process -> [%s] %s" % (parent_p.pid, parent_p.name))

processes = []
processes_n = 3
parent_conn, child_conn = Pipe()
for s in range(processes_n):
    child_p = ChildProcess(child_conn=child_conn)
    child_p.start()
    processes.append((parent_conn, child_conn, child_p))

for n in range(10):
    parent_conn, _, _ = processes[n % processes_n]
    parent_conn.send(n)

for n in range(10):
    parent_conn, _, _ = processes[n % processes_n]
    print('Result:', parent_conn.recv())

for n in range(processes_n):
    parent_conn, _, _ = processes[n % processes_n]
    parent_conn.send(None)

print("[%s] %s terminated" % (parent_p.pid, parent_p.name))

上述範例執行時將可能遇到類似以下的多種錯誤,都是共用管道可能發生的問題:

  • 錯誤 1
Traceback (most recent call last):
  File "/.../.pyenv/versions/3.7/lib/python3.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "mp.py", line 19, in run
    job = self.child_conn.recv()
  File "/.../.pyenv/versions/3.7/lib/python3.7/multiprocessing/connection.py", line 251, in recv
    return _ForkingPickler.loads(buf.getbuffer())
_pickle.UnpicklingError: unpickling stack underflow
  • 錯誤 2
I'm main process -> [65023] MainProcess
New process -> [65039] ChildProcess-1
New process -> [65040] ChildProcess-2
Process ChildProcess-2:
Traceback (most recent call last):
  File "/.../.pyenv/versions/3.7/lib/python3.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "test.py", line 19, in run
    job = self.child_conn.recv()
  File "/.../.pyenv/versions/3.7/lib/python3.7/multiprocessing/connection.py", line 251, in recv
    return _ForkingPickler.loads(buf.getbuffer())
_pickle.UnpicklingError: invalid load key, '\x05'.
New process -> [65041] ChildProcess-3

shared memory 共享記憶體

Python multiprocessing 模組簡單說明與範例 一文中提到的一樣,繼承 Process 類別的子類別,同樣能夠透過共享記憶體的方式存取資料,不過記得使用 lock 避免 race condition 的情況,以下是使用 shared memory 的範例,該範例於第 30 行新增 1 個整數型的共享記憶體,於第 34 行以傳參數的方式傳給 ChildProcess 使其能夠存取該共享記憶體,最後在第 21 行取得 lock 並將數值加到至該共享記憶體中:

import time

from multiprocessing import Process, Queue, Value, current_process


class ChildProcess(Process):
    def __init__(self, *args, **kwargs):
        self.in_queue = kwargs.pop('in_queue')
        self.n = kwargs.pop('n')
        super().__init__(*args, **kwargs)

    def run(self):
        p = current_process()
        print("New process -> [%s] %s" % (p.pid, p.name))
        while True:
            job = self.in_queue.get()
            if job is None:
                print("[%s] %s got None" % (p.pid, p.name))
                break
            print("[%s] %s got a job %d" % (p.pid, p.name, job))
            with self.n.get_lock():
                self.n.value += job
        print("[%s] %s terminated" % (p.pid, p.name))


parent_p = current_process()
print("I'm main process -> [%s] %s" % (parent_p.pid, parent_p.name))

in_queue = Queue()
shared_n = Value('i', 0)
processes = []
processes_n = 3
for s in range(processes_n):
    child_p = ChildProcess(in_queue=in_queue, n=shared_n)
    child_p.start()
    processes.append(child_p)

for i in range(10):
    in_queue.put(i)

for _ in range(processes_n):
    in_queue.put(None)

print("Result: %d" % shared_n.value)

print("[%s] %s terminated" % (parent_p.pid, parent_p.name))

上述範例執行結果如下,若從 0 一直加到 9, 正確答案是 45, 但是執行結果卻顯示只有 1, 而且每次執行都可能有不一樣的答案,這是為什麼呢?

$ python test.py
I'm main process -> [44685] MainProcess
New process -> [44701] ChildProcess-1
New process -> [44702] ChildProcess-2
[44701] ChildProcess-1 got a job 0
[44702] ChildProcess-2 got a job 1
[44701] ChildProcess-1 got a job 2
[44702] ChildProcess-2 got a job 3
[44701] ChildProcess-1 got a job 4
Result: 1
[44702] ChildProcess-2 got a job 5
[44701] ChildProcess-1 got a job 6
[44685] MainProcess terminated
[44702] ChildProcess-2 got a job 7
[44701] ChildProcess-1 got a job 8
[44702] ChildProcess-2 got a job 9
New process -> [44703] ChildProcess-3
[44701] ChildProcess-1 got None
[44701] ChildProcess-1 terminated
[44702] ChildProcess-2 got None
[44702] ChildProcess-2 terminated
[44703] ChildProcess-3 got None
[44703] ChildProcess-3 terminated

其實是因為 main process 在所有的 child process 執行完之前就先結束了,真正的結果還沒算出來,所以其數值 main process 結束當下的數值。

如果想要得到正確的答案,可以加一段 for 迴圈在 print("Result: %d" % shared_n.value)

for p in processes:
    p.join()

等待所有的 child processes 結束執行後,再列印答案。

除此之外,也可以使用接下來將介紹的 JoinableQueue

JoinableQueue

JoinableQueue 與一般的 Queue 差別在於其多了 2 個方法可供使用:

  • task_done()
  • join()

task_done() 用於告訴 JoinableQueue 從佇列中 1 筆資料已經完成處理。

join() 則是會等待 JoinableQueue 內的資料都有明確呼叫 task_done() 時才結束等待。

也因此當我們呼叫多少次 put() 將資料放進 JoinableQueue 中,就得有相對多少次呼叫 task_done() 才能代表佇列內的資料都已經完成處理,才能夠使 join() 結束等待。

所以前述 shared memory 的範例,可以將 in_queue 的部分改為 in_queue = JoinableQueue() 也就是以下第 29 行的部分,然後在第 15 行明確呼叫 task_done() , 告知 JoinableQueue 資料已處理完成,如此一來就能夠在 44 行以 in_queue.join() 等待所有資料都被處理結束,以取得正確結果:

from multiprocessing import Process, JoinableQueue, Value, current_process


class ChildProcess(Process):
    def __init__(self, *args, **kwargs):
        self.in_queue = kwargs.pop('in_queue')
        self.n = kwargs.pop('n')
        super().__init__(*args, **kwargs)

    def run(self):
        p = current_process()
        print("New process -> [%s] %s" % (p.pid, p.name))
        while True:
            job = self.in_queue.get()
            self.in_queue.task_done()
            if job is None:
                print("[%s] %s got None" % (p.pid, p.name))
                break
            print("[%s] %s got a job %d" % (p.pid, p.name, job))
            with self.n.get_lock():
                self.n.value += job

        print("[%s] %s terminated" % (p.pid, p.name))


parent_p = current_process()
print("I'm main process -> [%s] %s" % (parent_p.pid, parent_p.name))

in_queue = JoinableQueue()
shared_n = Value('i', 0)
processes = []
processes_n = 3
for s in range(processes_n):
    child_p = ChildProcess(in_queue=in_queue, n=shared_n)
    child_p.start()
    processes.append(child_p)

for i in range(10):
    in_queue.put(i)

for _ in range(processes_n):
    in_queue.put(None)

in_queue.join()
print("Result: %d" % shared_n.value)

print("[%s] %s terminated" % (parent_p.pid, parent_p.name))

上述範例執行結果如下,可以看到用 JoinableQueue 也能夠取得正確結果:

$ python test.py
I'm main process -> [45447] MainProcess
New process -> [45463] ChildProcess-1
New process -> [45464] ChildProcess-2
[45463] ChildProcess-1 got a job 0
[45464] ChildProcess-2 got a job 1
[45463] ChildProcess-1 got a job 2
[45464] ChildProcess-2 got a job 3
[45463] ChildProcess-1 got a job 4
[45464] ChildProcess-2 got a job 5
[45463] ChildProcess-1 got a job 6
[45464] ChildProcess-2 got a job 7
[45463] ChildProcess-1 got a job 8
[45464] ChildProcess-2 got a job 9
[45463] ChildProcess-1 got None
[45463] ChildProcess-1 terminated
[45464] ChildProcess-2 got None
[45464] ChildProcess-2 terminated
New process -> [45465] ChildProcess-3
[45465] ChildProcess-3 got None
[45465] ChildProcess-3 terminated
Result: 45
[45447] MainProcess terminated

總結

本文作為指引教學,實際透過多個範例了解 multiprocessing 模組中幾個重要的類別與功能,不過實際上仍有些部分礙於篇幅而無法詳細介紹,建議可以進一步參閱官方文件以了解更多。

References

https://docs.python.org/3/library/multiprocessing.html

對抗久坐職業傷害

研究指出每天增加 2 小時坐著的時間,會增加大腸癌、心臟疾病、肺癌的風險,也造成肩頸、腰背疼痛等常見問題。

然而對抗這些問題,卻只需要工作時定期休息跟伸展身體即可!

你想輕鬆改變現狀嗎?試試看我們的 PomodoRoll 番茄鐘吧! PomodoRoll 番茄鐘會根據你所設定的專注時間,定期建議你 1 項辦公族適用的伸展運動,幫助你打敗久坐所帶來的傷害!

贊助我們的創作

看完這篇文章了嗎? 休息一下,喝杯咖啡吧!

如果你覺得 MyApollo 有讓你獲得實用的資訊,希望能看到更多的技術分享,邀請你贊助我們一杯咖啡,讓我們有更多的動力與精力繼續提供高品質的文章,感謝你的支持!