Ask Your Question

Revision history [back]

click to hide/show revision 1
initial version

Niles is right. I also ran into this problem recently but found a straightforward way to solve it. I have some sample code below with detailed explanation. First, let's see a simple comparison between serial and parallel execution of a function.

@parallel(p_iter='multiprocessing', ncpus=3)
def test_parallel(n):
  f = factor(2^128+n)
return len(f)

t=walltime()
r = range(1000)
p = sorted(list( test_parallel(r)))
print p[-1]
print walltime(t)

t=walltime()
for i in range(1000):
  f = factor(2^128+i)
print f
print walltime(t)

(((999,), {}), 5)
6.359593153
5 * 23 * 383 * 1088533 * 7097431848554855065803619703
17.0849101543

test_parallel is a simple function that takes a nontrivial time to execute for testing purposes. It returns the number of distinct factors of the factorization of 2^128+n. The argument of test_parallel is a list created by the range function. Note that this has to be a list, there is currently no alternative, so e.g. xrange cannot be used in place of range because xrange generates numbers on the fly rather than creating a whole list of them. This can be a serious problem (mainly memory problem) but it can be overcome as will be shown further below. test_parallel as any parallel decorated function returns a special object, which is an iterator over 2-tuples and the order of the 2-tuples is entirely random! So, the output (((999,), {}), 5) above representing the last item in the calculation p[-1] includes the input value of n=999, an empty input keyword list/dictionary {}, and the return value of 5. It should be noted that in order to be able to parse the output from test_parallel it should be cast to a sorted list.

In this particular run the parallel calculation (using 3 cores) took some 6 seconds whereas at the bottom of the sample code the serial equivalent took some 17 seconds to execute (and the result of the last factorization confirms that there were 5 distinct factors).

This is all well and my great appreciation to the developers. Unfortunately, however, a serious problem arises when the list argument to a parallel function grows too big. One solution that has worked very well for me involves chunks as Niles suggested plus numpy arrays. The following code is a more robust and significantly more efficient alternative to the naive parallel code above.

%timeit
import numpy as np

sizof_chunk = 10^3
numof_chunk = 10^2
np_array    = np.zeros((sizof_chunk*numof_chunk,), dtype=np.uint8)

for i in range(numof_chunk):

  beg    = i *   sizof_chunk
  end    = beg + sizof_chunk
  tuples = sorted(list(test_parallel(range(beg,end))))
  iter = [ x[1] for x in tuples ]
  np_array[beg:end] = np.fromiter(iter, np.uint8)

print np_array

[1 2 3 ..., 6 8 3]
CPU time: 13.88 s,  Wall time: 670.06 s

sizof_chunk is set to the same number 1000 and numof_chunk can be set to anything. If it is set to 1 then the calculation will be the exact same as above (and will take about 6 seconds) but in this example it is set to 100 meaning that we tackle a problem 100 times larger than before. The code is self explanatory, it uses/recycles the same size of list (1000) and fills up np_array (which is created up front in its full size via np.zeros) by consecutive slices of np_array[beg:end]. iter = [ x[1] for x in tuples ] extracts just the returned values from the test_parallel output object, and np_array[beg:end] = np.fromiter(iter, np.uint8) fills then the array with these values. Note that with numpy arrays one can define the data type, which in this example allows for using an 8 byte integer, which is far less storage than a corresponding Python list.

Altogether, varying sizof_chunk, numof_chunk,and of course ncpus can help solve a lot of these kind of problems.

Istvan