# calling a parallel decorated function on an iterator

Say I've decorated a function f with @parallel and then, instead of evaluating f at a list of inputs, I evaluate f at a generator (whose list of yields is meant to be interpreted as the list of inputs at which I want to evaluate f). Silly example:

def my_iterator(n):
for i in xrange(n):
yield i

@parallel(verbose=True)
def f(x):
return x^2

for x in f(my_iterator(2^5)):
print x


This works---Sage interprets the input "my_iterator(2^5)" to f correctly. However, replacing 2^5 with 2^30 shows that the way Sage goes about trying to compute this is by first building a list of the yields of my_iterator(2^30) and then trying to distribute the elements of that list to forked subprocesses. That is,

for x in f(my_iterator(2^30)):
print x


is functionally identical to

for x in f([x for x in my_iterator(2^30)]):
print x


which is horrible. Instead of starting to yield outputs immediately and using virtually no memory, Sage consumes all available memory (as it tries to build a list of length 2^30) and then the computation just dies. Even worse, when it dies it stops silently with no output, despite the "verbose=True" option.

When setting up lengthy parallelized computations, sometimes it makes sense to create the inputs to a parallelized function using a generator (either algorithmically or by, say, reading an input file), and it would be nice if we could start sending those inputs out via the @parallel mechanism for processing as soon as they're generated, instead of trying to create and store every input that will be evaluated in memory before the first forked subprocess begins. I guess I would want the interior generator ("my_iterator(30)" in the above example) to yield its next result to be sent to a forked subprocess whenever a core is available for that forked subprocess, and the parallel computation will run until the interior generator is out of things to yield and all subprocesses have returned.

So my question. Is there a simple workaround, modification of the source code, or alternative method for achieving this?

edit retag close merge delete

I just ran into this problem too; one idea for a workaround would be to make a helper function which takes a generator and passes smallish chunks from it to the parallel-decorated function.

( 2013-07-02 02:54:17 -0600 )edit

Sort by » oldest newest most voted

Niles is right. I also ran into this problem recently but found a straightforward way to solve it. I have some sample code below with detailed explanation. First, let's see a simple comparison between serial and parallel execution of a function.

@parallel(p_iter='multiprocessing', ncpus=3)
def test_parallel(n):
f = factor(2^128+n)
return len(f)

t=walltime()
r = range(1000)
p = sorted(list( test_parallel(r)))
print p[-1]
print walltime(t)

t=walltime()
for i in range(1000):
f = factor(2^128+i)
print f
print walltime(t)


(((999,), {}), 5)
6.359593153
5 * 23 * 383 * 1088533 * 7097431848554855065803619703
17.0849101543


test_parallel is a simple function that takes a nontrivial time to execute for testing purposes. It returns the number of distinct factors of the factorization of 2^128+n. The argument of test_parallel is a list created by the range function. Note that this has to be a list, there is currently no alternative, so e.g. xrange cannot be used in place of range because xrange generates numbers on the fly rather than creating a whole list of them. This can be a serious problem (mainly memory problem) but it can be overcome as will be shown further below. test_parallel as any parallel decorated function returns a special object, which is an iterator over 2-tuples and the order of the 2-tuples is entirely random! So, the output (((999,), {}), 5) above representing the last item in the calculation p[-1] includes the input value of n=999, an empty input keyword list/dictionary {}, and the return value of 5. It should be noted that in order to be able to parse the output from test_parallel it should be cast to a sorted list.

In this particular run the parallel calculation (using 3 cores) took some 6 seconds whereas at the bottom of the sample code the serial equivalent took some 17 seconds to execute (and the result of the last factorization confirms that there were 5 distinct factors).

This is all well and my great appreciation to the developers. Unfortunately, however, a serious problem arises when the list argument to a parallel function grows too big. One solution that has worked very well for me involves chunks as Niles suggested plus numpy arrays. The following code is a more robust and significantly more efficient alternative to the naive parallel code above.

%timeit
import numpy as np

sizof_chunk = 10^3
numof_chunk = 10^2
np_array    = np.zeros((sizof_chunk*numof_chunk,), dtype=np.uint8)

for i in range(numof_chunk):

beg    = i *   sizof_chunk
end    = beg + sizof_chunk
tuples = sorted(list(test_parallel(range(beg,end))))
iter = [ x[1] for x in tuples ]
np_array[beg:end] = np.fromiter(iter, np.uint8)

print np_array


[1 2 3 ..., 6 8 3]
CPU time: 13.88 s,  Wall time: 670.06 s


sizof_chunk is set to the same number 1000 and numof_chunk can be set to anything. If it is set to 1 then the calculation will be the exact same as above (and will take about 6 seconds ...

more