1 | initial version |
Handling screen output (and other side-effects) during parallel processing is tricky business, because if you have multiple processes all writing to the same terminal simultaneously, the messages can become overlapped and garbled. It seems that what @parallel
does in this case is it collects each sub-process's stdout (it should probably also do this for stderr) into a file, and later when the each process completes it outputs that process's stdout all at once to your main process's stdout.
There are several other ways to do this of course, and maybe this interface could provide more options, but there is no one general sane way to automatically overlap output from multiple simultaneous processes, at least not without some terminal-based GUI.
Here's one solution I came up with for doing this for functions decorated with Sage's @parallel
decorator, though it was tricky-enough that I think it pointed the way toward some possible ways @parallel
could be improved.
First of all, we create a queue for handling log messages from each sub-process. When each process wants to log a message it does so by putting it on a queue (we'll see this later). Meanwhile, since @parallel
processes block the main process when called, we need to start a separate process (this might also work with a thread) to handle reading log messages off the queue and printing them one at a time as they come in. The nice thing about multiprocessing.Queue
is it handles issues like IPC and synchronization for us:
import os
def handle_log_messages(q):
while True:
# This blocks until at least one message arrives on the queue
message = q.get()
if message is None:
# We allow passing None as a "poison pill" to terminate the log handler
break
# Otherwise print the message and then wait for the next one
print('[%d] %s' % (os.getpid(), message))
Next, we modify the parallelized function to take the log queue as an optional second argument, and add a simple wrapper to make it easy to "log" messages by putting them on the queue:
@parallel(1, timeout=10)
def fun(x, log_queue=None):
def log(message):
if log_queue is not None:
log_queue.put(message)
log('Starting %d' % x)
sleep(5)
log('Finished %d' % x)
return x*x
Now when we call fun
we also need to pass in the log_queue
as an argument if we want logging. It would be nice if we could just call it like fun([1, 2, 3], log_queue=log_queue)
where for keyword arguments the same value is passed in for each worker process. Unfortunately it does not currently work that way so I needed a kinda ugly workaround as you'll see later.
Now we instantiate the queue and the log handler:
from multiprocessing import Queue, Process
log_queue = Queue()
log_handler = Process(target=handle_log_messages, args=(log_queue,))
log_handler.start() # Starts handle_log_messages in the background and starts waiting for messages
Then call our parallelized function. As mentioned previously we have to pass the same log_queue
as the second argument to the function, which I did like this:
from itertools import repeat
values = [1, 2, 3,]
squared = fun(zip(values, repeat(log_queue)))
Now, while fun
executes each log message is printed more-or-less in the order the arrive, along with the PID of the worker process it came from.
This could all probably be wrapped up in a more generic helper function, which might make a useful feature to add.