如何在終端機(terminal)漂亮地平行列印文字訊息(parallel output)?
Posted on Mar 13, 2024 in Python 程式設計 - 中階 by Amo Chen ‐ 2 min read
在使用 multiprocessing, threading 等模組跑平行處理時,如果需要在每個執行單位(process, thread)列印一些字串的話,你會怎麼做?
我想大多數人都直接 print()
到 terminal 就好,但是輸出的文字太多太長時,通常都沒有太大助益,這時候最好將輸出寫到檔案,一旦有問題就可以從檔案中尋找問題。
但是,我們多數時候還是想知道執行單位最新的運作狀態,例如正在執行哪個步驟,這些輸出一直附加到 terminal 上的話,不免還是會讓 terminal 顯得難看⋯⋯。
因此,本文將介紹 1 種方法讓平行處理的輸出可以清掉舊的輸出(output),並覆蓋新的輸出(output)上去,如此一來不僅可以知道每個執行單位的最新狀況,還可以讓 terminal 顯得乾淨、優雅!
本文環境
- macOS
- Python 3
本文所使用的方法與範例程式從 Neat parallel output in Python 而來。
Demo
原理介紹
要做到 Demo 那般的輸出方式,其原理如下,主要 3 個步驟。
- 列印輸出
- 更新輸出前,先逐行從底部向上清空每行輸出
- 由於我們逐行向上清空輸出,所以 cursor 會回到第 1 行,再列印新的輸出時,又會從第 1 行開始輸出,效果就像更新輸出一樣
完整程式碼
完整的程式碼如下:
import multiprocessing
import random
import time
multiprocessing.set_start_method('fork')
manager = multiprocessing.Manager()
terminal_lock = manager.Lock()
last_output_per_process = manager.dict()
repos = ["repoA", "repoB", "repoC", "repoD"]
num_procs = multiprocessing.cpu_count()
num_lines = min(len(repos), num_procs)
def randsleep():
time.sleep(random.randint(1, 5))
def fill_output():
to_fill = num_lines - len(last_output_per_process)
for _ in range(to_fill):
print()
def clean_up():
for _ in range(num_lines):
print("\x1b[1A\x1b[2K", end="") # move up cursor and delete whole line
def log(repo_name, *args):
with terminal_lock:
last_output_per_process[repo_name] = " ".join(str(arg) for arg in args)
clean_up()
sorted_lines = last_output_per_process.items()
for repo_name, last_line in sorted_lines:
print(f"{repo_name}: {last_line}")
fill_output()
def func(repo_name):
log(repo_name, "Starting")
randsleep() # Can be substituted for actual work
log(repo_name, "Installing")
randsleep()
log(repo_name, "Building")
randsleep()
log(repo_name, "Instrumenting")
randsleep()
log(repo_name, "Running tests")
randsleep()
log(repo_name, f"Result in {repo_name}.json")
with terminal_lock:
del last_output_per_process[repo_name]
def main():
fill_output()
with multiprocessing.Pool() as pool:
pool.map(func, repos, chunksize=1)
clean_up()
if __name__ == '__main__':
main()
關鍵部分說明
接著,解說前述程式碼的關鍵部分。
首先,為了確保同一時間只有 1 個 process 可以更新 terminal, 所以使用 Python 的 Manager 的 Lock, 所有 processes 在更新 terminal 之前,必須先取得 lock, 確保不會有 terminal 同時被更新的情況發生:
manager = multiprocessing.Manager()
terminal_lock = manager.Lock()
由於每個 process 並不知道彼此最新狀態為何,因此新增 1 個 Manager 的 dict 儲存每個 process 各自的狀態:
last_output_per_process = manager.dict()
其儲存結構為:
{
'repo name': '最後一次 print 的內容',
...略...
}
如此一來,每個 process 在更新 terminal 時,都可以藉由該 dict 把其他 processes 的狀態一起列印出來。
最關鍵的部分是逐行向上清除輸出的函式:
def clean_up():
for _ in range(num_lines):
print("\x1b[1A\x1b[2K", end="")
\x1b[1A\x1b[2K
是一連串的特殊 ANSI 碼,簡單來說就是 cursor 移至上 1 行,並清除該行內容。
最後解說 log(repo, *args)
函式,該函式做的事情很簡單,將自己的輸出寫入 last_output_per_process
之後,先逐行清除輸出,再列印新的輸出:
def log(repo_name, *args):
with terminal_lock:
last_output_per_process[repo_name] = " ".join(str(arg) for arg in args)
clean_up()
sorted_lines = last_output_per_process.items()
for repo_name, last_line in sorted_lines:
print(f"{repo_name}: {last_line}")
fill_output()
以上,就能夠做到優雅的平行輸出啦!
總結
看到 Neat parallel output in Python 一文才知道針對 parallel output 有如此讚的作法,著實上了一課!推推!
以上!
Enjoy!
References
https://bernsteinbear.com/blog/python-parallel-output/