用 Python 學網路程式設計重要概念 — 從 I/O 多工再到多執行緒以及 Thread Pool

Posted on  Jun 14, 2024  in  Python 程式設計 - 中階  by  Amo Chen  ‐ 8 min read

在「用 Python 學網路程式設計重要概念,從單執行緒到 I/O 多工(I/O multiplexing)」 1 文中,我們利用多個 Python 範例程式學會 I/O 多工的 3 種 syscall, 分別是 select() , poll() 以及 epoll() ,藉由 I/O 多工讓程式即使只有單執行緒也能同時處理多個連線。

本文將從 I/O 多工開始,進一步延伸學習多執行緒以及 Thread Pool 這 2 個經典的網路程式設計架構。

本文環境

  • Python 3

前言

本文是「用 Python 學網路程式設計重要概念,從單執行緒到 I/O 多工(I/O multiplexing)」 1 文的後續之作,強烈建議不理解 I/O 多工的讀者先閱讀該篇文章,以對本文有更好的理解能力。

因為教學的緣故,為求讓大家知道撰寫 socket 伺服器,實際處理多個連線時會遇到什麼問題,所以前述文章使用的是低階的 socket 模組

本文將使用高階的 socketserver 模組,節省撰寫低階 socket 的程式碼的過程,將可以更專注多執行緒的部份。

socketserver 模組簡介

socketserver 是 Python 1 個內建模組,主要用來簡化撰寫 Socket Server 的工作,因此在官網中也宣稱它是一個方便的 Socket Server Framework 。

The socketserver module simplifies the task of writing network servers.

通常在開發網路應用時,不外乎使用 TCP, UDP 等協定,因此 socketserver 模組提供 4 種基本的 Server 種類,分別是:

  1. TCPServer
  2. UDPServer
  3. UnixStreamServer
  4. UnixDatagramServer

這些 Server 都是 Python 類別,這些類別也已經寫好各種接受連線、關閉連線等基本功能, 作為開發者的我們,只需要專注在撰寫如何處理要求的程式碼即可,也就是所謂的 RequestHandlerClass, 剩下的部分都可以交由 socketserver 模組幫忙處理。

更多關於 socketserver 模組請閱讀「Python socketserver 模組 — 方便的 Socket Server Framework 使用教學」。

本文將會使用 TCPServer 實作 1 個極簡單的 HTTP server 。

socketserver 處理 request 的流程

在實際撰寫 HTTP server 之前,先了解一下 socketserver 處理 request 的流程。

從大處著眼的話, socketserver 將網路應用分為 Server 與 RequestHandler 2 個部分:

  1. Server
  2. RequestHandler

Server 負責處理接受連線、關閉連線等基本共通的事務,而 RequestHandler 則只負責處理要求,也是開發者必須提供的部分,以下畫成圖表示:

socketserver-overview.png

簡單的解釋是 Sever 端會負責把 request 傳給我們寫的 RequsetHandler 處理。

雖然圖片看起來很簡單,不過實際上 socketserver 所提供的 Server 類別已經幫開發者處理掉很多繁瑣的事情,舉 TCPServer 為例, TCPServer 整個流程做掉這些事(原始碼):

tcpserver-flow.png

上圖中,僅有綠色部分是開發者需要實作的,其他藍色部分都由 TCPServer 自行處理,明顯可以感受到使用 socketserver 的好處,開發者僅需要專注於如何處理 request 即可。

簡單的 HTTP Server

接著,我們用 socketserver 模組做 1 個 HTTP server, 以下是其程式碼:

http_server.py

import socketserver


class HTTPRequestHandler(socketserver.StreamRequestHandler):
    def handle(self):
        data = self.rfile.readline().strip()
        if data.startswith(b"GET /favicon.ico"):
            self.wfile.write(
                b"HTTP/1.1 404 Not Found\n"
                b"Content-Type: text/html\n\n"
                b"<html><body><h1>404 Not Found</h1></body></html>\n"
            )
            return

        self.wfile.write(
            (
                b"HTTP/1.1 200 OK\n"
                b"Content-Type: text/html\n\n"
                b"<html>"
                b"<head><title>Hello, socketserver</title></head>"
                b"<body><h1>Hello, socketserver!</h1></body>"
                b"</html>\n"
            )
        )


if __name__ == "__main__":
    HOST, PORT = "127.0.0.1", 65432
    with socketserver.TCPServer((HOST, PORT), HTTPRequestHandler) as server:
        print("Server started on", HOST, "port", PORT)
        server.serve_forever()

上述程式碼是 1 個極簡單的 HTTP server, HTTPRequestHandler 主要用來處理讀取 HTTP request 以及回應 HTTP response 的部分,當收到 GET /favicon.ico 的 request 時, HTTP server 就會回應 404 狀態碼以及相對應的 HTML 內容,除此之外,一律回應 200 狀態碼以及一份 HTML 內容。

啟動以 socketserver 模組開發的 HTTP server 的指令如下:

$ python http_server.py

執行成功的話,使用瀏覽器輸入網址 http://127.0.0.1:65432 將可以看到以下畫面:

hello-socketserver.png

socketserver 模組預設使用 I/O 多工處理 requests

在「用 Python 學網路程式設計重要概念,從單執行緒到 I/O 多工(I/O multiplexing)」 1 文中,我們用 I/O 多工的 syscall 使得單執行緒的網路程式也具備處理多個連線的能力,而作為 Python 高階模組的 socketserver 也有實作一樣的功能,從它的 serve_forever() 原始碼中可以看到:

selectors.png

_ServerSelector原始碼如下,可以看到 socketserver 模組是利用另 1 個 Python 高階模組 selectors 提供 I/O 多工:

if hasattr(selectors, 'PollSelector'):
    _ServerSelector = selectors.PollSelector
else:
    _ServerSelector = selectors.SelectSelector

poll/select have the advantage of not requiring any extra file descriptor, contrarily to epoll/kqueue (also, they require a single syscall).

而 socketserver 在有 poll() syscall 可以使用時,會優先使用 PollSelector , 如果沒有則使用 SelectSelector ,只使用這 2 個的好處在於不需要額外的 file descriptor 而且只需要 1 個 system call 就好。

所以使用 socketserver 時,我們也不需要自己實作 I/O 多工的功能,就算是單執行緒,基本上也有一定水準的效能。

我們可以用 APACHE ab 指令稍微壓力測試一下,模擬 100 個 concurrent requests ,總量 1,000 個 requests 的情況:

$ ab -c 100 -n 1000 http://127.0.0.1:65432/

以下是其測試結果,可以看到 0.063 秒就完成處理 1,000 個 requests 之外,回應最長的時間為 5ms, 其餘 99% 的回應時間都在 1ms 以內。

This is ApacheBench, Version 2.3 <$Revision: 1903618 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/

Benchmarking 127.0.0.1 (be patient)
Completed 100 requests
Completed 200 requests
Completed 300 requests
Completed 400 requests
Completed 500 requests
Completed 600 requests
Completed 700 requests
Completed 800 requests
Completed 900 requests
Completed 1000 requests
Finished 1000 requests


Server Software:
Server Hostname:        127.0.0.1
Server Port:            65432

Document Path:          /
Document Length:        103 bytes

Concurrency Level:      100
Time taken for tests:   0.063 seconds
Complete requests:      1000
Failed requests:        0
Total transferred:      144000 bytes
HTML transferred:       103000 bytes
Requests per second:    15958.64 [#/sec] (mean)
Time per request:       6.266 [ms] (mean)
Time per request:       0.063 [ms] (mean, across all concurrent requests)
Transfer rate:          2244.18 [Kbytes/sec] received

Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        0    0   0.3      0       3
Processing:     0    0   0.2      0       4
Waiting:        0    0   0.1      0       1
Total:          0    0   0.3      0       5

Percentage of the requests served within a certain time (ms)
  50%      0
  66%      0
  75%      0
  80%      0
  90%      0
  95%      1
  98%      1
  99%      1
 100%      5 (longest request)

不過這份測試報告實際上帶給我們的意義不大,主要是:

  1. 在 localhost 壓力測試無法反映真實網路傳輸的情形
  2. 這個 HTTP server 極其簡單,回應的 bytes 也很少,跟實際 production 使用情況落差相當大

這份報告只能告訴我們 socketserver 的實作方式具有一定的效能水準。

問題來了!單執行緒最怕執行緒被佔用!

我們知道 I/O multiplexing 賦予單執行緒能夠處理多個連線的能力,不過它仍有其極限存在,畢竟它終究是單執行緒,一旦唯一的執行緒被一直佔用執行某些運算的話,即使有 I/O multiplexing 也無法正常處理其他連線的 request 。

我們可以在 http_server.py 中加入 time.sleep(60) 模擬執行緒一直在執行某些運算的情況:

import socketserver
import time

class HTTPRequestHandler(socketserver.StreamRequestHandler):
    def handle(self):
        data = self.rfile.readline().strip()
        if data.startswith(b"GET /favicon.ico"):
            self.wfile.write(
                b"HTTP/1.1 404 Not Found\n"
                b"Content-Type: text/html\n\n"
                b"<html><body><h1>404 Not Found</h1></body></html>\n"
            )
            return

        self.wfile.write(
            (
                b"HTTP/1.1 200 OK\n"
                b"Content-Type: text/html\n\n"
                b"<html>"
                b"<head><title>Hello, socketserver</title></head>"
                b"<body><h1>Hello, socketserver!</h1></body>"
                b"</html>\n"
            )
        )
        time.sleep(60)


if __name__ == "__main__":
    HOST, PORT = "127.0.0.1", 65432
    with socketserver.TCPServer((HOST, PORT), HTTPRequestHandler) as server:
        print("Server started on", HOST, "port", PORT)
        server.serve_forever()

上述程式碼執行之後,我們可以打開瀏覽器,接著用 2 個分頁都輸入 http://127.0.0.1:65432/ ,將會發現其中 1 個分頁會有內容,另 1 個分頁則一直在等待內容,直到 60 秒過去,才會 2 個分頁都有內容,這就是執行緒被佔用,導致無法處理其他 requests 的情況!

多執行緒架構

為了解決前述單執行緒被佔用的問題,我們可以使用多執行緒架構彌補單執行緒架構的不足。

也就是每 1 個 request 都用 1 個新的執行緒處理,處理完畢之後再關閉執行緒即可。

作為 Python 高階模組的 socketserver 也提供多執行緒處理 requests 的功能,只要使用 ThreadingTCPServer 類別即可,改完之後的程式碼如下:

http_server.py

import socketserver
import time


class HTTPRequestHandler(socketserver.StreamRequestHandler):
    def handle(self):
        data = self.rfile.readline().strip()
        if data.startswith(b"GET /favicon.ico"):
            self.wfile.write(
                b"HTTP/1.1 404 Not Found\n"
                b"Content-Type: text/html\n\n"
                b"<html><body><h1>404 Not Found</h1></body></html>\n"
            )
            return

        self.wfile.write(
            (
                b"HTTP/1.1 200 OK\n"
                b"Content-Type: text/html\n\n"
                b"<html>"
                b"<head><title>Hello, socketserver</title></head>"
                b"<body><h1>Hello, socketserver!</h1></body>"
                b"</html>\n"
            )
        )
        time.sleep(60)


if __name__ == "__main__":
    HOST, PORT = "127.0.0.1", 65432
    with socketserver.ThreadingTCPServer((HOST, PORT), HTTPRequestHandler) as server:
        print("Server started on", HOST, "port", PORT)
        server.serve_forever()

上述程式碼執行之後,我們可以打開瀏覽器,接著用 2 個分頁都輸入 http://127.0.0.1:65432/ ,就會發現即使有 time.sleep(60) ,這次 2 個分頁都能正常顯示!

實際上 socketserver.ThreadingTCPServer 的定義如下:

class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass

它僅僅只是多重繼承 ThreadingMixInTCPServer 2 個類別而已。

如果我們想知道多執行緒架構是怎麼實作的,則只要閱讀 ThreadingMixIn原始碼即可:

    def process_request_thread(self, request, client_address):
        """Same as in BaseServer but as a thread.

        In addition, exception handling is done here.

        """
        try:
            self.finish_request(request, client_address)
        except Exception:
            self.handle_error(request, client_address)
        finally:
            self.shutdown_request(request)

    def process_request(self, request, client_address):
        """Start a new thread to process the request."""
        if self.block_on_close:
            vars(self).setdefault('_threads', _Threads())
        t = threading.Thread(target = self.process_request_thread,
                             args = (request, client_address))
        t.daemon = self.daemon_threads
        self._threads.append(t)
        t.start()

上述程式碼可以看到, ThreadingMixIn 其實覆寫(override)了 process_request 這個方法,並在該方法將 request 交給新的執行緒處理:

t = threading.Thread(
    target = self.process_request_thread,
    args = (request, client_address),
)
t.daemon = self.daemon_threads
self._threads.append(t)
t.start()

畫成圖的話,下圖橘色部分就是在新的執行緒中處理:

threading-flow.png

這就是 socketserver 用多執行緒處理 request 的架構。

目前為止,我們可以看到 socketserver 用 I/O 多工接收 requests, 並將 request 委派至新的執行緒中處理,避免單執行緒被佔用導致無法正常處理 requests 的情況。

問題又來啦!無限開執行緒可以嗎?

在 ThreadingMixIn 的程式碼中,我們可以注意到它並沒有限制執行緒的數量,如果每個 request 都會佔用執行緒一段時間,假設在短時間內湧進 1,000 個 requests, 那就可能會創造出 1,000 個執行緒,這必然造成 CPU context switch 的壓力。此外,建立執行緒與關閉執行緒,也都會耗用 CPU 運算資源。

這也是為什麼我們經常在一些伺服器的文件中看到執行緒數量有預設值的原因(通常建議執行緒數量的公式 CPU 核心數 x 2 到 4 之間的數字)。

Thread Pool

為了控制 CPU context switch 的開銷,以及減少執行緒一直開開關關的成本,可以使用一種稱為 Thread Pool 的技術,這個技術會預先建立固定數量的執行緒,並將它們放在 1 個 Pool 中(其實 Pool 只是 1 個抽象概念,我們可以用 dictionary, array, queue 等資料結構實作),如果需要執行緒處理工作,就從 Pool 中拿 1 個執行緒,當執行緒完成工作之後,再將其放回 Pool 。這樣一來,執行緒可以重複利用,從而減少重複建立執行緒的成本。

為了解決 ThreadMixIn 沒有限制執行緒數量的問題,我們可以用 Thread Pool 的技術解決。

做法則是:

  1. 建立好 1 個 Queue 儲存 request
  2. 建立好 n 個執行緒,並從第 1 步所建立的 Queue 中取得 request 並處理

threadpool-overview.png

改完之後的程式碼如下:

http_server.py

import selectors
import socket
import socketserver
import threading
from queue import Queue


class HTTPRequestHandler(socketserver.StreamRequestHandler):
    def handle(self):
        data = self.rfile.readline().strip()
        if data.startswith(b"GET /favicon.ico"):
            self.wfile.write(
                b"HTTP/1.1 404 Not Found\n"
                b"Content-Type: text/html\n\n"
                b"<html><body><h1>404 Not Found</h1></body></html>\n"
            )
            return

        self.wfile.write(
            (
                b"HTTP/1.1 200 OK\n"
                b"Content-Type: text/html\n\n"
                b"<html>"
                b"<head><title>Hello, socketserver</title></head>"
                b"<body><h1>Hello, socketserver!</h1></body>"
                b"</html>\n"
            )
        )


class ThreadPoolHTTPServer(socketserver.ThreadingTCPServer):
    allow_reuse_address = True
    threads = 4

    def serve_forever(self, poll_interval=0.5):
        print(f'Server is running with number of threads: {self.threads}')

        # set up the threadpool
        self.requests = Queue(self.threads)
        for x in range(self.threads):
            t = threading.Thread(target = self.process_request_thread)
            t.setDaemon(1)
            t.start()

        # server main loop
        try:
            with selectors.SelectSelector() as selector:
                selector.register(self, selectors.EVENT_READ)
                while True:
                    ready = selector.select(poll_interval)
                    if ready:
                        self.handle_request()
                    self.service_actions()
        finally:
            self.server_close()

    def process_request_thread(self):
        '''
        obtain request from queue instead of directly from server socket
        '''
        while True:
            q = self.requests.get()
            socketserver.ThreadingMixIn.process_request_thread(self, *q)
            self.requests.task_done()

    def handle_request(self):
        '''
        simply collect requests and put them on the queue for the workers.
        '''
        try:
            request, client_address = self.get_request()
        except socket.error:
            return
        if self.verify_request(request, client_address):
            self.requests.put((request, client_address))


if __name__ == "__main__":
    HOST, PORT = "127.0.0.1", 65432
    with ThreadPoolHTTPServer((HOST, PORT), HTTPRequestHandler) as server:
        print("Server started on", HOST, "port", PORT)
        server.serve_forever()

重點說明一下上述程式碼,首先 ThreadPoolHTTPServer 類別繼承了 ThreadingTCPServer 並且覆寫(override) 3 個方法:

  1. serve_forever()
  2. handle_request()
  3. process_request_thread()

serve_forever() 主要先建立 1 個 Queue 作為儲存 request 的地方:

self.requests = Queue(self.threads)

接著預先建立 n 個執行緒,此處 n 為 4:

for x in range(self.threads):
    t = threading.Thread(target = self.process_request_thread)
    t.setDaemon(1)
    t.start()

t.setDaemon(1) 會將執行緒設定成 daemon thread, 程式結束時(例如按 Ctrl + C),不管執行緒是否正在處理 request, 都會暴力將執行緒結束,比較好的作法是實作 graceful shutdown ,讓程式等待所有執行緒都處理完工作後結束。

這些執行緒執行的目標是 self.process_request_thread 方法,該方法如同前文所述,會從 Queue 中取得 request 並處理:

def process_request_thread(self):
    q = self.requests.get()
    socketserver.ThreadingMixIn.process_request_thread(self, *q)
    self.requests.task_done()

process_request_thread 重複利用 ThreadingMixIn.process_request_thread(self, request, client_address) ,省去重新開發的過程。

serve_forever() 剩下的部分則模仿 socketserver 原始碼中的邏輯,改為直接使用 selectors 模組中的 SelectSelector, 在接收到 EVENT_READ 事件時呼叫 handle_request() 方法處理 request:

try:
    with selectors.SelectSelector() as selector:
        selector.register(self, selectors.EVENT_READ)
        while True:
            ready = selector.select(poll_interval)
            if ready:
                self.handle_request()
                self.service_actions()
finally:
    self.server_close()

p.s. 此處也可以改用 PollSelector, EpollSelector 等等,詳見 Python 模組教學 - selectors

至於要怎麼把 request 放到 Queue 裡,則是 handle_request() 在接收到 request 時放進去的:

def handle_request(self):
    try:
        request, client_address = self.get_request()
    except socket.error:
        return
    if self.verify_request(request, client_address):
        self.requests.put((request, client_address))

這樣就完成 1 個具有 Thread Pool 功能的伺服器啦!

這個版本不僅限制了執行緒的數量,還減少重複建立執行緒的開銷!不過相對地,這個版本其實也是 1 種折衷方案,當所有執行緒都在忙碌時,或者 Queue 滿了,也會造成無法正常處理 request 的問題。

至此,你已經接觸了 I/O 多工、多執行緒以及 Thread Pool 3 種網路程式設計會用到的技術!

實際上,只要足夠理解這些概念,就算沒有 socketserver 模組的幫忙,你也能夠用各種模組或者程式語言開發出屬於自己的網路程式!

總結

I/O 多工、多執行緒以及 Thread Pool 是網路程式設計中的實用技術,儘管各大框架和伺服器已經將這些技術處理得很好,大多數開發者並不會特別涉及這些底層細節。然而,對這些技術有深刻理解,能夠更好地掌握在處理龐大流量時的效能最佳化。

以上!

Enjoy!

References

https://docs.python.org/3/library/socketserver.html

sockets with threadpool server python

對抗久坐職業傷害

研究指出每天增加 2 小時坐著的時間,會增加大腸癌、心臟疾病、肺癌的風險,也造成肩頸、腰背疼痛等常見問題。

然而對抗這些問題,卻只需要工作時定期休息跟伸展身體即可!

你想輕鬆改變現狀嗎?試試看我們的 PomodoRoll 番茄鐘吧! PomodoRoll 番茄鐘會根據你所設定的專注時間,定期建議你 1 項辦公族適用的伸展運動,幫助你打敗久坐所帶來的傷害!

贊助我們的創作

看完這篇文章了嗎? 休息一下,喝杯咖啡吧!

如果你覺得 MyApollo 有讓你獲得實用的資訊,希望能看到更多的技術分享,邀請你贊助我們一杯咖啡,讓我們有更多的動力與精力繼續提供高品質的文章,感謝你的支持!