# Printing from parallel processes

I am trying to use Sage's parallelization with @parallel(p_iter='fork') (I need fork specifically to work around the problems brought up here.)

If I use print to show some messages, they only appear when the parallelized function has finished. Example:

@parallel(1, timeout=10)
def fun(x):
print('Starting %d' % x)
sleep(5)
print('Finished %d' % x)
return x*x

list(fun([2,3,4]))


The "Starting" and the "Finished" messages show up simultaneously, even though they are issued with a 5 second time difference.

How can I print a message so that it will show up immediately?

I need this when I am running a Sage script in a terminal. I am not using Sage interactively in this case and I am not using the notebook interface.

edit retag close merge delete

Sort by » oldest newest most voted

Handling screen output (and other side-effects) during parallel processing is tricky business, because if you have multiple processes all writing to the same terminal simultaneously, the messages can become overlapped and garbled. It seems that what @parallel does in this case is it collects each sub-process's stdout (it should probably also do this for stderr) into a file, and later when the each process completes it outputs that process's stdout all at once to your main process's stdout.

There are several other ways to do this of course, and maybe this interface could provide more options, but there is no one general sane way to automatically overlap output from multiple simultaneous processes, at least not without some terminal-based GUI.

Here's one solution I came up with for doing this for functions decorated with Sage's @parallel decorator, though it was tricky-enough that I think it pointed the way toward some possible ways @parallel could be improved.

First of all, we create a queue for handling log messages from each sub-process. When each process wants to log a message it does so by putting it on a queue (we'll see this later). Meanwhile, since @parallel processes block the main process when called, we need to start a separate process (this might also work with a thread) to handle reading log messages off the queue and printing them one at a time as they come in. The nice thing about multiprocessing.Queue is it handles issues like IPC and synchronization for us:

import os

def handle_log_messages(q):
while True:
# This blocks until at least one message arrives on the queue
message = q.get()
if message is None:
# We allow passing None as a "poison pill" to terminate the log handler
break

# Otherwise print the message and then wait for the next one
print('[%d] %s' % (os.getpid(), message))


Next, we modify the parallelized function to take the log queue as an optional second argument, and add a simple wrapper to make it easy to "log" messages by putting them on the queue:

@parallel(1, timeout=10)
def fun(x, log_queue=None):
def log(message):
if log_queue is not None:
log_queue.put(message)
log('Starting %d' % x)
sleep(5)
log('Finished %d' % x)
return x*x


Now when we call fun we also need to pass in the log_queue as an argument if we want logging. It would be nice if we could just call it like fun([1, 2, 3], log_queue=log_queue) where for keyword arguments the same value is passed in for each worker process. Unfortunately it does not currently work that way so I needed a kinda ugly workaround as you'll see later.

Now we instantiate the queue and the log handler:

from multiprocessing import Queue, Process
log_queue = Queue()
log_handler = Process(target=handle_log_messages, args=(log_queue,))
log_handler.start()  # Starts handle_log_messages in the background and starts waiting for messages


Then call our parallelized function. As mentioned previously we have to pass the same log_queue as the second argument to the function, which I did like this:

from itertools import repeat
values = [1, 2, 3,]
squared = fun(zip(values, repeat(log_queue)))


Now, while fun executes each log message is printed more-or-less in the order the arrive, along with the PID of the worker process it came from.

This could all probably be wrapped up in a more generic helper function, which might make a useful feature to add.

more