How to Neatly Display Parallel Output in Terminal?
Posted on Mar 13, 2024 in Python Programming - Intermediate Level by Amo Chen ‐ 3 min read
When using modules like multiprocessing or threading for parallel processing, have you ever wondered how to print string messages from each process or thread effectively?
Most people would simply use print()
in the terminal. However, when the output is too long or too much, it might not be particularly useful. In such scenarios, it’s often best to write the outputs to a file so you can trace back any issues later.
Still, most of the time, we want to know the latest status of each executing unit, like which step it’s currently at. If these outputs continue to pile up in the terminal, it inevitably makes the terminal quite messy.
In this article, I will introduce a method to refresh and overlay outputs of parallel processing, so you can see the latest status of each unit while keeping the terminal clean and elegant!
Environment
- macOS
- Python 3
The method and example code used in this article are from Neat Parallel Output in Python.
Demo
Explanation of the Concept
To achieve the demo-style output, the process involves three main steps.
- Print the initial output.
- Before updating, clear each line from bottom to top.
- Because we clear outputs line by line upwards, the cursor goes back to the first line. When printing new outputs, it starts from the first line, giving the appearance of an updated output.
Complete Code
Here is the complete code:
import multiprocessing
import random
import time
multiprocessing.set_start_method('fork')
manager = multiprocessing.Manager()
terminal_lock = manager.Lock()
last_output_per_process = manager.dict()
repos = ["repoA", "repoB", "repoC", "repoD"]
num_procs = multiprocessing.cpu_count()
num_lines = min(len(repos), num_procs)
def randsleep():
time.sleep(random.randint(1, 5))
def fill_output():
to_fill = num_lines - len(last_output_per_process)
for _ in range(to_fill):
print()
def clean_up():
for _ in range(num_lines):
print("\x1b[1A\x1b[2K", end="") # move up cursor and delete whole line
def log(repo_name, *args):
with terminal_lock:
last_output_per_process[repo_name] = " ".join(str(arg) for arg in args)
clean_up()
sorted_lines = last_output_per_process.items()
for repo_name, last_line in sorted_lines:
print(f"{repo_name}: {last_line}")
fill_output()
def func(repo_name):
log(repo_name, "Starting")
randsleep() # Can be substituted for actual work
log(repo_name, "Installing")
randsleep()
log(repo_name, "Building")
randsleep()
log(repo_name, "Instrumenting")
randsleep()
log(repo_name, "Running tests")
randsleep()
log(repo_name, f"Result in {repo_name}.json")
with terminal_lock:
del last_output_per_process[repo_name]
def main():
fill_output()
with multiprocessing.Pool() as pool:
pool.map(func, repos, chunksize=1)
clean_up()
if __name__ == '__main__':
main()
Key Points Explained
Let’s break down the key sections of the code presented earlier.
First, to ensure only one process updates the terminal at a time, use Python’s Manager with a lock. All processes must acquire this lock before updating the terminal to prevent simultaneous updates:
manager = multiprocessing.Manager()
terminal_lock = manager.Lock()
Since each process doesn’t know the latest status of others, a Manager dict is used to store the state of each process:
last_output_per_process = manager.dict()
This dictates the structure:
{
'repo name': 'Content of last print',
...snip...
}
In this way, whenever a process updates the terminal, it can print out the statuses of other processes using the dict.
The crucial part is the function that clears outputs line by line upwards:
def clean_up():
for _ in range(num_lines):
print("\x1b[1A\x1b[2K", end="")
\x1b[1A\x1b[2K
is a series of special ANSI codes, which simply move the cursor up one line and clear that line.
Finally, let’s go over the log(repo, *args)
function, which writes its output to last_output_per_process
, clears previous outputs line by line, and then prints new outputs:
def log(repo_name, *args):
with terminal_lock:
last_output_per_process[repo_name] = " ".join(str(arg) for arg in args)
clean_up()
sorted_lines = last_output_per_process.items()
for repo_name, last_line in sorted_lines:
print(f"{repo_name}: {last_line}")
fill_output()
And that’s how you achieve elegant parallel output!
Conclusion
After reading Neat Parallel Output in Python, I learned such a brilliant approach for handling parallel outputs. It was truly enlightening!
That’s all!
Enjoy!
References
https://bernsteinbear.com/blog/python-parallel-output/