如何在終端機(terminal)漂亮地平行列印文字訊息(parallel output)?

Posted on  Mar 13, 2024  in  Python 程式設計 - 中階  by  Amo Chen  ‐ 2 min read

在使用 multiprocessing, threading 等模組跑平行處理時,如果需要在每個執行單位(process, thread)列印一些字串的話,你會怎麼做?

我想大多數人都直接 print() 到 terminal 就好,但是輸出的文字太多太長時,通常都沒有太大助益,這時候最好將輸出寫到檔案,一旦有問題就可以從檔案中尋找問題。

但是,我們多數時候還是想知道執行單位最新的運作狀態,例如正在執行哪個步驟,這些輸出一直附加到 terminal 上的話,不免還是會讓 terminal 顯得難看⋯⋯。

因此,本文將介紹 1 種方法讓平行處理的輸出可以清掉舊的輸出(output),並覆蓋新的輸出(output)上去,如此一來不僅可以知道每個執行單位的最新狀況,還可以讓 terminal 顯得乾淨、優雅!

本文環境

  • macOS
  • Python 3

本文所使用的方法與範例程式從 Neat parallel output in Python 而來。

Demo

原理介紹

要做到 Demo 那般的輸出方式,其原理如下,主要 3 個步驟。

  1. 列印輸出

neat-parallel-output-1.png

  1. 更新輸出前,先逐行從底部向上清空每行輸出

neat-parallel-output-2.png

  1. 由於我們逐行向上清空輸出,所以 cursor 會回到第 1 行,再列印新的輸出時,又會從第 1 行開始輸出,效果就像更新輸出一樣

neat-parallel-output-3.png

完整程式碼

完整的程式碼如下:

import multiprocessing
import random
import time


multiprocessing.set_start_method('fork')

manager = multiprocessing.Manager()
terminal_lock = manager.Lock()
last_output_per_process = manager.dict()

repos = ["repoA", "repoB", "repoC", "repoD"]
num_procs = multiprocessing.cpu_count()
num_lines = min(len(repos), num_procs)


def randsleep():
    time.sleep(random.randint(1, 5))


def fill_output():
    to_fill = num_lines - len(last_output_per_process)
    for _ in range(to_fill):
        print()


def clean_up():
    for _ in range(num_lines):
        print("\x1b[1A\x1b[2K", end="")  # move up cursor and delete whole line


def log(repo_name, *args):
    with terminal_lock:
        last_output_per_process[repo_name] = " ".join(str(arg) for arg in args)
        clean_up()
        sorted_lines = last_output_per_process.items()
        for repo_name, last_line in sorted_lines:
            print(f"{repo_name}: {last_line}")
        fill_output()


def func(repo_name):
    log(repo_name, "Starting")
    randsleep()  # Can be substituted for actual work
    log(repo_name, "Installing")
    randsleep()
    log(repo_name, "Building")
    randsleep()
    log(repo_name, "Instrumenting")
    randsleep()
    log(repo_name, "Running tests")
    randsleep()
    log(repo_name, f"Result in {repo_name}.json")
    with terminal_lock:
        del last_output_per_process[repo_name]


def main():
    fill_output()
    with multiprocessing.Pool() as pool:
        pool.map(func, repos, chunksize=1)
    clean_up()


if __name__ == '__main__':
    main()

關鍵部分說明

接著,解說前述程式碼的關鍵部分。

首先,為了確保同一時間只有 1 個 process 可以更新 terminal, 所以使用 Python 的 Manager 的 Lock, 所有 processes 在更新 terminal 之前,必須先取得 lock, 確保不會有 terminal 同時被更新的情況發生:

manager = multiprocessing.Manager()
terminal_lock = manager.Lock()

由於每個 process 並不知道彼此最新狀態為何,因此新增 1 個 Manager 的 dict 儲存每個 process 各自的狀態:

last_output_per_process = manager.dict()

其儲存結構為:

{
    'repo name': '最後一次 print 的內容',
    ...略...
}

如此一來,每個 process 在更新 terminal 時,都可以藉由該 dict 把其他 processes 的狀態一起列印出來。

最關鍵的部分是逐行向上清除輸出的函式

def clean_up():
    for _ in range(num_lines):
        print("\x1b[1A\x1b[2K", end="")

\x1b[1A\x1b[2K 是一連串的特殊 ANSI 碼,簡單來說就是 cursor 移至上 1 行,並清除該行內容。

最後解說 log(repo, *args) 函式,該函式做的事情很簡單,將自己的輸出寫入 last_output_per_process 之後,先逐行清除輸出,再列印新的輸出:

def log(repo_name, *args):
    with terminal_lock:
        last_output_per_process[repo_name] = " ".join(str(arg) for arg in args)
        clean_up()
        sorted_lines = last_output_per_process.items()
        for repo_name, last_line in sorted_lines:
            print(f"{repo_name}: {last_line}")
        fill_output()

以上,就能夠做到優雅的平行輸出啦!

總結

看到 Neat parallel output in Python 一文才知道針對 parallel output 有如此讚的作法,著實上了一課!推推!

以上!

Enjoy!

References

https://bernsteinbear.com/blog/python-parallel-output/

對抗久坐職業傷害

研究指出每天增加 2 小時坐著的時間,會增加大腸癌、心臟疾病、肺癌的風險,也造成肩頸、腰背疼痛等常見問題。

然而對抗這些問題,卻只需要工作時定期休息跟伸展身體即可!

你想輕鬆改變現狀嗎?試試看我們的 PomodoRoll 番茄鐘吧! PomodoRoll 番茄鐘會根據你所設定的專注時間,定期建議你 1 項辦公族適用的伸展運動,幫助你打敗久坐所帶來的傷害!

贊助我們的創作

看完這篇文章了嗎? 休息一下,喝杯咖啡吧!

如果你覺得 MyApollo 有讓你獲得實用的資訊,希望能看到更多的技術分享,邀請你贊助我們一杯咖啡,讓我們有更多的動力與精力繼續提供高品質的文章,感謝你的支持!