用 Python 學網路程式設計重要概念 — 從多執行緒到 Multi-process, Pre-fork 再到 Multi-process 混搭 Multi-thread

Posted on  Jun 17, 2024  in  Python 程式設計 - 中階  by  Amo Chen  ‐ 8 min read

在這篇文章,我們將結合先前所學到的知識,更深入地探討網路程式設計中的一些重要概念,特別是使用 Python 從多執行緒架構開始,逐步過渡到 multi-process 和 pre-fork 技術,最終結合 multi-process 和 multi-thread,形成混合式架構。

通過這些過程,我們不僅能理解這些技術背後的原理,還能掌握網路程式設計中的核心技術,並將這些概念融會貫通,為開發高效能的網路應用打下堅實的基礎。

本文環境

  • Python 3

前言

本文是「用 Python 學網路程式設計重要概念,從 I/O 多工再到多執行緒以及 Thread Pool」 1 文的後續之作,強烈建議不理解多執行緒的讀者先閱讀該篇文章,以對本文有更好的理解能力。

本文將使用高階的 socketserver 模組,節省撰寫低階 socket 的程式碼的過程,將可以更專注 multi-process 的部份。

為什麼要使用 Multi-process?

用 Python 學網路程式設計重要概念,從 I/O 多工再到多執行緒以及 Thread Pool」 1 文中介紹多執行緒的概念並進行實作,不過多執行緒有 2 個主要缺點存在:

  • 如果 main process 出現問題,會導致其他執行緒也無法正常運作。
  • 這是 Python 特有的缺點)Python 的 GIL (Global Interpreter Lock) 限制會導致同一時間只有 1 個執行緒可以執行。這意味即使是多執行緒,它實際上同一時間只有 1 個執行緒會被 CPU 執行,只是由於 CPU 切換執行緒的速度非常快,我們感受起來仍像是多執行緒在同時執行。

關於第 1 點,我們可以用以下程式碼實驗:

import selectors
import socket
import socketserver
import threading
from queue import Queue


class HTTPRequestHandler(socketserver.StreamRequestHandler):
    def handle(self):
        data = self.rfile.readline().strip()
        if data.startswith(b"GET /favicon.ico"):
            self.wfile.write(
                b"HTTP/1.1 404 Not Found\n"
                b"Content-Type: text/html\n\n"
                b"<html><body><h1>404 Not Found</h1></body></html>\n"
            )
            return

        self.wfile.write(
            (
                b"HTTP/1.1 200 OK\n"
                b"Content-Type: text/html\n\n"
                b"<html>"
                b"<head><title>Hello, socketserver</title></head>"
                b"<body><h1>Hello, socketserver!</h1></body>"
                b"</html>\n"
            )
        )


class ThreadPoolHTTPServer(socketserver.ThreadingTCPServer):
    allow_reuse_address = True
    threads = 4

    def serve_forever(self, poll_interval=0.5):
        print(f'Server is running with number of threads: {self.threads}')

        # set up the threadpool
        self.requests = Queue(self.threads)
        for x in range(self.threads):
            t = threading.Thread(target = self.process_request_thread)
            t.setDaemon(1)
            t.start()

        # server main loop
        try:
            with selectors.SelectSelector() as selector:
                selector.register(self, selectors.EVENT_READ)
                count = 0
                while True:
                    ready = selector.select(poll_interval)
                    if ready:
                        if count == 2:
                            raise RuntimeError
                        self.handle_request()
                        count += 1
                    self.service_actions()
        finally:
            self.server_close()

    def process_request_thread(self):
        '''
        obtain request from queue instead of directly from server socket
        '''
        while True:
            q = self.requests.get()
            socketserver.ThreadingMixIn.process_request_thread(self, *q)
            self.requests.task_done()

    def handle_request(self):
        '''
        simply collect requests and put them on the queue for the workers.
        '''
        try:
            request, client_address = self.get_request()
        except socket.error:
            return
        if self.verify_request(request, client_address):
            self.requests.put((request, client_address))


if __name__ == "__main__":
    HOST, PORT = "127.0.0.1", 65432
    with ThreadPoolHTTPServer((HOST, PORT), HTTPRequestHandler) as server:
        print("Server started on", HOST, "port", PORT)
        server.serve_forever()

上述程式碼在 serve_forever() 方法中故意埋下接到第 2 個 request 就 raise RuntimeError 的炸彈,因此只要執行以下步驟就可以讓整個伺服器停止執行:

  1. 打開瀏覽器並輸入網址 http://127.0.0.1:65432
  2. 重新整理網頁 2 次

這其實也是 1 種單點故障(SPOF, Single point of failure),所以設計上我們會使用多個 processes 或稱 multi-process, 緩解 single process 架構可能因為故障導致服務無法正常運作的問題,只要還有 1 個 process 正常存活,仍有辦法提供服務(但前提是剩下的 process 還能承受所有壓力)。

使用 multi-process 還有 2 個主要優點,能夠提升程式的穩定性與效能:

  1. Multi-process 的記憶體空間是獨立的,不像 multi-thread 是共享 main process 的記憶體,所以 multi-process 之間基本不會互相干擾。
  2. Multi-process 可以繞開 Python 的 GIL 限制,所以多個 processes 在多核 CPU 的環境下可以同時執行。也由於 multi-process 可以很好的利用多核 CPU 運算能力,所以吃重 CPU 運算的 CPU 密集型(CPU-bound)任務,適合使用 multi-process; 而 I/O 密集型(I/O bound)的任務,例如網路傳輸、資料讀寫等可以交由 multi-thread 負責。

但 multi-process 的缺點是:

  1. 由於 multi-process 的每個 process 擁有獨立記憶體空間,所以資源開銷比較大,不像 multi-thread 可以共享記憶體空間。
  2. 跨 process 溝通(IPC, Inter-process Communication)相對複雜。 2 個 processes 之間溝通需要用到 Pipe, Network, Shared Memory 或 Queue 等手段/技術。相比之下, multi-thread 可以直接存取共享記憶體進行溝通。但兩者都可能會有 race condition 的情況存在。

從比較結果可以得知沒有完美的解決方案,不管使用 multi-thread 或者 multi-process 都有各自的優點與缺點。

知道 multi-process 可以解決什麼問題之後,我們同樣可以使用 Python 高階模組 socketserver 做 1 個 multi-process 版本的 HTTP server 。

用 socketserver.ForkingTCPServer 實作 Multi-process HTTP server

Python 的 socketserver 模組不僅實作多執行緒架構,也有實作 multi-process 架構,開發者只要使用 socketserver.ForkingTCPServer 即可,以下是改完之後的程式碼:

import socketserver


class HTTPRequestHandler(socketserver.StreamRequestHandler):
    def handle(self):
        data = self.rfile.readline().strip()
        if data.startswith(b"GET /favicon.ico"):
            self.wfile.write(
                b"HTTP/1.1 404 Not Found\n"
                b"Content-Type: text/html\n\n"
                b"<html><body><h1>404 Not Found</h1></body></html>\n"
            )
            return

        self.wfile.write(
            (
                b"HTTP/1.1 200 OK\n"
                b"Content-Type: text/html\n\n"
                b"<html>"
                b"<head><title>Hello, socketserver</title></head>"
                b"<body><h1>Hello, socketserver!</h1></body>"
                b"</html>\n"
            )
        )


if __name__ == "__main__":
    HOST, PORT = "127.0.0.1", 65432
    with socketserver.ForkingTCPServer((HOST, PORT), HTTPRequestHandler) as server:
        print("Server started on", HOST, "port", PORT)
        server.serve_forever()

上述程式碼僅改用 socketserver.ForkingTCPServer 作為 server, 就完成 multi-process 的功能,真的相當便捷!

但我們還是要深入研究一下 ForkingTCPServer 到底做了什麼事!

實際上, ForkingTCPServer 也是 1 個多重繼承的類別,其定義如下:

if hasattr(os, "fork"):
    class ForkingTCPServer(ForkingMixIn, TCPServer): pass

首先,當作業系統支援 fork() syscall 時,才會有 ForkingTCPServer 可以使用, POSIX 標準的作業系統都會有 fork() syscall, 例如 Linux, Unix 作業系統。

Users of macOS or users of libc or malloc implementations other than those typically found in glibc to date are among those already more likely to experience deadlocks running such code.

macOS 雖然也有 fork() 可以使用,不過官方文件提到 macOS 由於實作上的差異,所以 macOS 上的 os.fork() 很可能會遭遇 deadlock 的情況。

再來, ForkingTCPServer 的實作細節則隱藏在 ForkingMixIn 類別之中,所以我們其實要細看 ForkingMixIn 到底做了什麼事。

socketserver.ForkingMixIn 與 Multi-process

接著細看 ForkingMixIn程式碼,學習 Python 是如何實作 multi-process 架構,以下是 ForkingMixIn 會產生新的 process 處理 request 的關鍵程式碼:

def process_request(self, request, client_address):
    """Fork a new subprocess to process the request."""
    pid = os.fork()
    if pid:
        # Parent process
        if self.active_children is None:
            self.active_children = set()
        self.active_children.add(pid)
        self.close_request(request)
        return
    else:
        # Child process.
        # This must never return, hence os._exit()!
        status = 1
        try:
            self.finish_request(request, client_address)
            status = 0
        except Exception:
            self.handle_error(request, client_address)
        finally:
            try:
                self.shutdown_request(request)
            finally:
                os._exit(status)

重點說明上述程式碼。

ForkingMixInThreadingMixIn 一樣都是覆寫(override) process_request() 方法,只是 ForkingMixIn 在接到 request 時,就會先 Fork 出 1 個 subprocess, 也就是以下程式碼:

pid = os.fork()

上述程式碼執行後,就會有 2 個 processes 處於完全相同的狀態, 1 個稱為 parent process, 1 個則是 Child process, 這 2 個 processes 都會繼續執行剩下的程式碼,也就是以下區塊:

if pid:
   ...
else:
   ...

Fork a child process. Return 0 in the child and the child’s process id in the parent.

根據 Python 官方文件,如果是 child process, 上述 pid 必為 0, 所以它會執行 else 區塊,負責呼叫 RequestHandler 處理 request, 處理完 request 之後就呼叫 os._exit(status) 結束執行:

    else:
        # Child process.
        # This must never return, hence os._exit()!
        status = 1
        try:
            self.finish_request(request, client_address)
            status = 0
        except Exception:
            self.handle_error(request, client_address)
        finally:
            try:
                self.shutdown_request(request)
            finally:
                os._exit(status)

如果是 parent process, 上述 pid 則為 child process 的 pid, 所以它會執行 if pid: 區塊,將 child process 的 pid 加入到 1 個 set 之中,作為後續狀態追蹤的用途:

    if pid:
        # Parent process
        if self.active_children is None:
            self.active_children = set()
        self.active_children.add(pid)
        self.close_request(request)
        return

所以上述 if ... else ... 區塊其實都會執行到,只是 1 個在 parent process, 1 個在 child process 中執行!

透過這段程式碼,我們也能充分理解 fork() syscall 的用途,在呼叫 fork() syscall 時,作業系統會建立 1 個與當前 process 一模一樣的 process (稱為 child process),就連即將要執行的程式碼也都一樣,不過我們可以透過判斷 pid 將兩者的行為分開。

將 ForkingMixIn 處理 request 的方式畫成圖的話,如下所示:

forking-overview.png

這就是 socketserver 模組所使用的 multi-process 魔法!

ForkingMixIn 有限制 processes 數量嗎?

答案是:

有限制 fork 的 processes 數量,而且最多可以 fork 40 個 child processes 」(原始碼

不像 ThreadingMixIn 沒有限制執行緒的數量, ForkingMixIn 會限制 child process 的數量,這原因是每次 fork() 都會需要分配獨立的資源給 child process, 例如記憶體,如果不限制 child process 的數量,當短時間內湧入大量的 requests 時,很快就會耗光硬體資源,所以限制 child process 的數量是 1 個相當實務的做法!

開開關關也是一項開銷

有人可能會想說 child process 一直開開關關的話,是不是會影響伺服器的效率?

答案:「正確」!

這也是為何我們能看到一些伺服器實作上會預先把 child processes 先建立好,道理跟 thread pool 很相似,可以減少一些開開關關 child process 的開銷,這種手段被稱為 pre-fork, pre 代表是在 child processes 開始接收 request 之前先建立好。

要做到 pre-fork 也相當簡單,就是 parent process 開始接收 request 之前先呼叫 os.fork() 事先建立 child processes, 接著每個 processes 各自再進入能接收 request 的階段(對應到 socketserver 模組的話,則是呼叫 serve_forever() 方法)以下是使用 TCPServer 類別實作 pre-fork 的範例:

http_server.py

import os
import socketserver


class HTTPRequestHandler(socketserver.StreamRequestHandler):
    def handle(self):
        data = self.rfile.readline().strip()
        if data.startswith(b"GET /favicon.ico"):
            self.wfile.write(
                b"HTTP/1.1 404 Not Found\n"
                b"Content-Type: text/html\n\n"
                b"<html><body><h1>404 Not Found</h1></body></html>\n"
            )
            return
        pid = os.getpid()
        self.wfile.write(
            (
                b"HTTP/1.1 200 OK\n"
                b"Content-Type: text/html\n\n"
                b"<html>"
                b"<head><title>Hello, socketserver</title></head>"
                b"<body><h1>Hello, socketserver! PID: "+ str(pid).encode() + b"</h1></body>"
                b"</html>\n"
            )
        )

def pre_fork(server, num_processes):
    for _ in range(num_processes):
        pid = os.fork()
        if pid == 0:  # Child process
            server.serve_forever()
            os._exit(0)


if __name__ == "__main__":
    HOST, PORT = "127.0.0.1", 65432
    with socketserver.TCPServer((HOST, PORT), HTTPRequestHandler) as server:
        pre_fork(server, 3)
        print("Server started on", HOST, "port", PORT)
        server.serve_forever()

上述程式碼的重點在於 pre_fork(server, num_processes) 函式,該函式會呼叫 num_processes 次數的 os.fork() ,建立多個 child process, 由於 child process 得到的 pid 都是 0, 所以它們都會進入 if 區塊執行 serve_forever() 開始接受 requests 。

呼叫完 pre_fork(server, 3) 之後, parent process 也自己呼叫 serve_forever() 開始接受 requests 。

上述程式碼可以用以下指令執行:

$ python http_server.py

執行成功之後可以打開瀏覽器,網址輸入 http://127.0.0.1:65432/ 就可以看到類似以下畫面,各位可以試著重新整理網頁,可以發現都是固定 4 個 PID 在處理 requests (1 parent + 3 children):

pre-fork.png

以上就是 multi-process 與 pre-fork 的架構!

p.s. 可能有人會疑問 parent process 與 child processes 監聽同 1 個 port 會不會有問題?實際不會有問題,每 1 個 request 只會分配給其中 1 個 process 處理

等等,現在是 Multi-process 搭配單執行緒架構!?

截至目前爲止,我們的範例屬於 multi-process 搭配單執行緒架構,每個 process 都只有 1 個主執行緒。

Multi-process 搭配單執行緒其實還是會有執行緒可能被佔用的問題,如果想解決這個問題,可以在 multi-process 基礎上加上多執行緒的功能,變成每個 process 底下還是有多個執行緒可以提供服務,這種搭配完全不衝突:

forking-and-threading.png

本文 multi-process 範例結合「用 Python 學網路程式設計重要概念,從 I/O 多工再到多執行緒以及 Thread Pool」的 Thread Pool 範例,其程式碼如下:

http_server.py

import os
import selectors
import socket
import socketserver
import threading
from queue import Queue


class HTTPRequestHandler(socketserver.StreamRequestHandler):
    def handle(self):
        data = self.rfile.readline().strip()
        if data.startswith(b"GET /favicon.ico"):
            self.wfile.write(
                b"HTTP/1.1 404 Not Found\n"
                b"Content-Type: text/html\n\n"
                b"<html><body><h1>404 Not Found</h1></body></html>\n"
            )
            return
        thread_id = threading.get_ident()
        pid = os.getpid()
        self.wfile.write(
            (
                b"HTTP/1.1 200 OK\n"
                b"Content-Type: text/html\n\n"
                b"<html>"
                b"<head><title>Hello, socketserver</title></head>"
                b"<body><h1>Hello, socketserver! PID: "+ str(pid).encode() + b" | Thread ID:" + str(thread_id).encode() + b"</h1></body>"
                b"</html>\n"
            )
        )


class ThreadPoolHTTPServer(socketserver.ThreadingTCPServer):
    allow_reuse_address = True
    threads = 4

    def serve_forever(self, poll_interval=0.5):
        print(f'Server is running with number of threads: {self.threads}')

        # set up the threadpool
        self.requests = Queue(self.threads)
        for x in range(self.threads):
            t = threading.Thread(target = self.process_request_thread)
            t.setDaemon(1)
            t.start()

        # server main loop
        try:
            with selectors.SelectSelector() as selector:
                selector.register(self, selectors.EVENT_READ)
                while True:
                    ready = selector.select(poll_interval)
                    if ready:
                        self.handle_request()
                    self.service_actions()
        finally:
            self.server_close()

    def process_request_thread(self):
        '''
        obtain request from queue instead of directly from server socket
        '''
        while True:
            q = self.requests.get()
            socketserver.ThreadingMixIn.process_request_thread(self, *q)
            self.requests.task_done()

    def handle_request(self):
        '''
        simply collect requests and put them on the queue for the workers.
        '''
        try:
            request, client_address = self.get_request()
        except socket.error:
            return
        if self.verify_request(request, client_address):
            self.requests.put((request, client_address))


def pre_fork(server, num_processes):
    for _ in range(num_processes):
        pid = os.fork()
        if pid == 0:  # Child process
            server.serve_forever()
            os._exit(0)


if __name__ == "__main__":
    HOST, PORT = "127.0.0.1", 65432
    with ThreadPoolHTTPServer((HOST, PORT), HTTPRequestHandler) as server:
        print("Server started on", HOST, "port", PORT)
        pre_fork(server, 3)
        server.serve_forever()

眼尖的讀者應該可以發現上述程式碼只是多加了 pre_fork() 函數與呼叫 pre_fork(server, 3) 而已。

上述程式碼可以用以下指令執行:

$ python http_server.py

執行成功之後可以打開瀏覽器,網址輸入 http://127.0.0.1:65432/ 就可以看到類似以下畫面,各位可以試著重新整理網頁,可以發現都是固定 4 個 PID 在處理 requests (1 parent + 3 children)以及固定 16 個 Thread Id 在處理 requests 而已:

forking-and-thread-pool.png

Multi-process 與 multi-thread 是否相當簡單呢?

總結

本文接續前文從多執行緒架構出發,描述其可能遇到的問題,並以實際範例學習 multi-process 架構,並認識何謂 fork 與 pre-fork 技術,最後再綜合起來,將 multi-process 與 multi-thread 結合在一起,變成混合式的架構!相信這一路的演進,不僅可以讓大家認識網路程式設計中的幾大重要概念與 syscall, 還可以將這些概念融會貫通,從而對各種伺服器處理 request 的架構有所認識與掌握。

以上!

Enjoy!

References

socketserver — A framework for network servers

極簡說明Multi-thread/Multi-process、CPU-bound/IO-bound和GIL

Multi-Process(多行程)&Multi-Thread(多執行序)到底是個啥(1)

對抗久坐職業傷害

研究指出每天增加 2 小時坐著的時間,會增加大腸癌、心臟疾病、肺癌的風險,也造成肩頸、腰背疼痛等常見問題。

然而對抗這些問題,卻只需要工作時定期休息跟伸展身體即可!

你想輕鬆改變現狀嗎?試試看我們的 PomodoRoll 番茄鐘吧! PomodoRoll 番茄鐘會根據你所設定的專注時間,定期建議你 1 項辦公族適用的伸展運動,幫助你打敗久坐所帶來的傷害!

贊助我們的創作

看完這篇文章了嗎? 休息一下,喝杯咖啡吧!

如果你覺得 MyApollo 有讓你獲得實用的資訊,希望能看到更多的技術分享,邀請你贊助我們一杯咖啡,讓我們有更多的動力與精力繼續提供高品質的文章,感謝你的支持!