ASKSAGE: Sage Q&A Forum - RSS feedhttps://ask.sagemath.org/questions/Q&A Forum for SageenCopyright Sage, 2010. Some rights reserved under creative commons license.Tue, 21 Jun 2016 23:18:16 +0200calling a parallel decorated function on an iteratorhttps://ask.sagemath.org/question/9886/calling-a-parallel-decorated-function-on-an-iterator/Say I've decorated a function f with @parallel and then, instead of evaluating f at a list of inputs, I evaluate f at a generator (whose list of yields is meant to be interpreted as the list of inputs at which I want to evaluate f). Silly example:
def my_iterator(n):
for i in xrange(n):
yield i
@parallel(verbose=True)
def f(x):
return x^2
for x in f(my_iterator(2^5)):
print x
This works---Sage interprets the input "my_iterator(2^5)" to f correctly. However, replacing 2^5 with 2^30 shows that the way Sage goes about trying to compute this is by first building a <i>list</i> of the yields of my_iterator(2^30) and then trying to distribute the elements of that list to forked subprocesses. That is,
for x in f(my_iterator(2^30)):
print x
is functionally identical to
for x in f([x for x in my_iterator(2^30)]):
print x
which is horrible. Instead of starting to yield outputs immediately and using virtually no memory, Sage consumes all available memory (as it tries to build a list of length 2^30) and then the computation just dies. Even worse, when it dies it stops silently with no output, despite the "verbose=True" option.
When setting up lengthy parallelized computations, sometimes it makes sense to create the inputs to a parallelized function using a generator (either algorithmically or by, say, reading an input file), and it would be nice if we could start sending those inputs out via the @parallel mechanism for processing as soon as they're generated, instead of trying to create and store every input that will be evaluated in memory before the first forked subprocess begins. I guess I would want the interior generator ("my_iterator(30)" in the above example) to yield its next result to be sent to a forked subprocess whenever a core is available for that forked subprocess, and the parallel computation will run until the interior generator is out of things to yield and all subprocesses have returned.
So my question. Is there a simple workaround, modification of the source code, or alternative method for achieving this?Wed, 06 Mar 2013 22:15:31 +0100https://ask.sagemath.org/question/9886/calling-a-parallel-decorated-function-on-an-iterator/Comment by niles for <p>Say I've decorated a function f with @parallel and then, instead of evaluating f at a list of inputs, I evaluate f at a generator (whose list of yields is meant to be interpreted as the list of inputs at which I want to evaluate f). Silly example:</p>
<pre><code>def my_iterator(n):
for i in xrange(n):
yield i
@parallel(verbose=True)
def f(x):
return x^2
for x in f(my_iterator(2^5)):
print x
</code></pre>
<p>This works---Sage interprets the input "my_iterator(2^5)" to f correctly. However, replacing 2^5 with 2^30 shows that the way Sage goes about trying to compute this is by first building a <i>list</i> of the yields of my_iterator(2^30) and then trying to distribute the elements of that list to forked subprocesses. That is, </p>
<pre><code>for x in f(my_iterator(2^30)):
print x
</code></pre>
<p>is functionally identical to</p>
<pre><code>for x in f([x for x in my_iterator(2^30)]):
print x
</code></pre>
<p>which is horrible. Instead of starting to yield outputs immediately and using virtually no memory, Sage consumes all available memory (as it tries to build a list of length 2^30) and then the computation just dies. Even worse, when it dies it stops silently with no output, despite the "verbose=True" option.</p>
<p>When setting up lengthy parallelized computations, sometimes it makes sense to create the inputs to a parallelized function using a generator (either algorithmically or by, say, reading an input file), and it would be nice if we could start sending those inputs out via the @parallel mechanism for processing as soon as they're generated, instead of trying to create and store every input that will be evaluated in memory before the first forked subprocess begins. I guess I would want the interior generator ("my_iterator(30)" in the above example) to yield its next result to be sent to a forked subprocess whenever a core is available for that forked subprocess, and the parallel computation will run until the interior generator is out of things to yield and all subprocesses have returned. </p>
<p>So my question. Is there a simple workaround, modification of the source code, or alternative method for achieving this?</p>
https://ask.sagemath.org/question/9886/calling-a-parallel-decorated-function-on-an-iterator/?comment=17390#post-id-17390I just ran into this problem too; one idea for a workaround would be to make a helper function which takes a generator and passes smallish chunks from it to the parallel-decorated function.Tue, 02 Jul 2013 09:54:17 +0200https://ask.sagemath.org/question/9886/calling-a-parallel-decorated-function-on-an-iterator/?comment=17390#post-id-17390Answer by ikol for <p>Say I've decorated a function f with @parallel and then, instead of evaluating f at a list of inputs, I evaluate f at a generator (whose list of yields is meant to be interpreted as the list of inputs at which I want to evaluate f). Silly example:</p>
<pre><code>def my_iterator(n):
for i in xrange(n):
yield i
@parallel(verbose=True)
def f(x):
return x^2
for x in f(my_iterator(2^5)):
print x
</code></pre>
<p>This works---Sage interprets the input "my_iterator(2^5)" to f correctly. However, replacing 2^5 with 2^30 shows that the way Sage goes about trying to compute this is by first building a <i>list</i> of the yields of my_iterator(2^30) and then trying to distribute the elements of that list to forked subprocesses. That is, </p>
<pre><code>for x in f(my_iterator(2^30)):
print x
</code></pre>
<p>is functionally identical to</p>
<pre><code>for x in f([x for x in my_iterator(2^30)]):
print x
</code></pre>
<p>which is horrible. Instead of starting to yield outputs immediately and using virtually no memory, Sage consumes all available memory (as it tries to build a list of length 2^30) and then the computation just dies. Even worse, when it dies it stops silently with no output, despite the "verbose=True" option.</p>
<p>When setting up lengthy parallelized computations, sometimes it makes sense to create the inputs to a parallelized function using a generator (either algorithmically or by, say, reading an input file), and it would be nice if we could start sending those inputs out via the @parallel mechanism for processing as soon as they're generated, instead of trying to create and store every input that will be evaluated in memory before the first forked subprocess begins. I guess I would want the interior generator ("my_iterator(30)" in the above example) to yield its next result to be sent to a forked subprocess whenever a core is available for that forked subprocess, and the parallel computation will run until the interior generator is out of things to yield and all subprocesses have returned. </p>
<p>So my question. Is there a simple workaround, modification of the source code, or alternative method for achieving this?</p>
https://ask.sagemath.org/question/9886/calling-a-parallel-decorated-function-on-an-iterator/?answer=33876#post-id-33876Niles is right. I also ran into this problem recently but found a straightforward way to solve it. I have some sample code below with detailed explanation. First, let's see a simple comparison between serial and parallel execution of a function.
@parallel(p_iter='multiprocessing', ncpus=3)
def test_parallel(n):
f = factor(2^128+n)
return len(f)
t=walltime()
r = range(1000)
p = sorted(list( test_parallel(r)))
print p[-1]
print walltime(t)
t=walltime()
for i in range(1000):
f = factor(2^128+i)
print f
print walltime(t)
----------
(((999,), {}), 5)
6.359593153
5 * 23 * 383 * 1088533 * 7097431848554855065803619703
17.0849101543
`test_parallel` is a simple function that takes a nontrivial time to execute for testing purposes. It returns the number of distinct factors of the factorization of 2^128+n. The argument of `test_parallel` is a list created by the `range` function. Note that this has to be a list, there is currently no alternative, so e.g. `xrange` cannot be used in place of `range` because `xrange` generates numbers on the fly rather than creating a whole list of them. This can be a serious problem (mainly memory problem) but it can be overcome as will be shown further below. `test_parallel` as any parallel decorated function returns a special object, which is an iterator over 2-tuples and the order of the 2-tuples is entirely random! So, the output `(((999,), {}), 5)` above representing the last item in the calculation `p[-1]` includes the input value of `n=999`, an empty input keyword list/dictionary `{}`, and the return value of `5`. It should be noted that in order to be able to parse the output from `test_parallel` it should be cast to a sorted list.
In this particular run the parallel calculation (using 3 cores) took some 6 seconds whereas at the bottom of the sample code the serial equivalent took some 17 seconds to execute (and the result of the last factorization confirms that there were 5 distinct factors).
This is all well and my great appreciation to the developers. Unfortunately, however, a serious problem arises when the list argument to a parallel function grows too big. One solution that has worked very well for me involves chunks as Niles suggested plus numpy arrays. The following code is a more robust and significantly more efficient alternative to the naive parallel code above.
%timeit
import numpy as np
sizof_chunk = 10^3
numof_chunk = 10^2
np_array = np.zeros((sizof_chunk*numof_chunk,), dtype=np.uint8)
for i in range(numof_chunk):
beg = i * sizof_chunk
end = beg + sizof_chunk
tuples = sorted(list(test_parallel(range(beg,end))))
iter = [ x[1] for x in tuples ]
np_array[beg:end] = np.fromiter(iter, np.uint8)
print np_array
----------
[1 2 3 ..., 6 8 3]
CPU time: 13.88 s, Wall time: 670.06 s
`sizof_chunk` is set to the same number 1000 and `numof_chunk` can be set to anything. If it is set to 1 then the calculation will be the exact same as above (and will take about 6 seconds) but in this example it is set to 100 meaning that we tackle a problem 100 times larger than before. The code is self explanatory, it uses/recycles the same size of list (1000) and fills up `np_array` (which is created up front in its full size via `np.zeros`) by consecutive slices of `np_array[beg:end]`. `iter = [ x[1] for x in tuples ]` extracts just the returned values from the `test_parallel` output object, and `np_array[beg:end] = np.fromiter(iter, np.uint8)` fills then the array with these values. Note that with numpy arrays one can define the data type, which in this example allows for using an 8 byte integer, which is far less storage than a corresponding Python list.
Altogether, varying `sizof_chunk`, `numof_chunk`,and of course `ncpus` can help solve a lot of these kind of problems.
IstvanTue, 21 Jun 2016 23:18:16 +0200https://ask.sagemath.org/question/9886/calling-a-parallel-decorated-function-on-an-iterator/?answer=33876#post-id-33876