Multiprocessing in Python

Although Python is not very well suited for parallel programming, sometimes it could be useful.

If it’s a computation, then we are probably better off using something like Dask, Numba, etc. But if it’s not computations, then there is a built-in solution in Python: multiprocessing.

We’ll stick with computations for examples though, since their are simpler.

Comparison of parallel and not parallel

A quick illustration of why parallelization is great when the problem is embarrassingly parallel.

Not parallel

 1import time
 2
 3def square(x):
 4    time.sleep(1)
 5    return pow(x, 2)
 6
 7if __name__ == '__main__':
 8    started_at = time.time()
 9
10    xs = range(10)
11    results = [square(x) for x in xs]
12    print(results)
13
14    finished_at = time.time()
15    print(f"Elapsed: {(finished_at - started_at):f}")

Output:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Elapsed: 10.007492

Parallel

 1import time
 2from multiprocessing import Pool
 3
 4def square(x):
 5    time.sleep(1)
 6    return pow(x, 2)
 7
 8if __name__ == '__main__':
 9    started_at = time.time()
10    xs = range(10)
11
12    with Pool(5) as p:
13        results = p.map(square, xs)
14    print(results)
15
16    finished_at = time.time()
17    print(f"Elapsed: {(finished_at - started_at):f}")

Output:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Elapsed: 2.180675

Limitations of multiprocessing

If it’s a simple case, like the one above, everything works fine. However, multiprocessing relies on pickle, which requires1 that

[…] picklable functions and classes must be defined in the top level of a module.

This makes parallelizing functions that were created dynamically impossible. For example, the following code will not run:

 1import time
 2from multiprocessing import Pool
 3
 4def power_factory(y):
 5    def f(x):
 6        time.sleep(1)
 7        return pow(x, y)
 8    return f
 9
10if __name__ == '__main__':
11    started_at = time.time()
12
13    square = power_factory(2)
14    xs = range(10)
15    with Pool(5) as p:
16        results = p.map(square, xs)
17    print(results)
18
19    finished_at = time.time()
20    print(f"Elapsed: {(finished_at - started_at):f}")

It throws the following exception:

AttributeError: Can't pickle local object 'power_factory.<locals>.f'

Overcoming limitations

One way of solving the issue would be to rewrite code in a way that does not require dynamic functions. The simple example above could be rewritten using apply_async or starmap, for instance.

apply_async

 1import time
 2from multiprocessing import Pool
 3
 4def slow_pow(x, y):
 5    time.sleep(1)
 6    return pow(x, y)
 7
 8if __name__ == '__main__':
 9    started_at = time.time()
10
11    xs = range(10)
12    results = []
13    with Pool(5) as p:
14        for x in xs:
15            results.append(p.apply_async(slow_pow, (x, 2)))
16        results = [r.get() for r in results]
17    print(results)
18
19    finished_at = time.time()
20    print(f"Elapsed: {(finished_at - started_at):f}")

Output:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Elapsed: 2.039259

starmap

 1import time
 2from multiprocessing import Pool
 3
 4def slow_pow(x, y):
 5    time.sleep(1)
 6    return pow(x, y)
 7
 8if __name__ == '__main__':
 9    started_at = time.time()
10
11    xs = range(10)
12    with Pool(5) as p:
13        results = p.starmap(slow_pow, zip(xs, [2]*len(xs)))
14    print(results)
15
16    finished_at = time.time()
17    print(f"Elapsed: {(finished_at - started_at):f}")

Output:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Elapsed: 2.026638

Pathos

However, if we cannot rewrite the code, Pathos package has a solution. Among other things, aiming at making parallel computing easier, it has “better multiprocessing and multi-threading in Python”2 in the multiprocessing module. The same code as above could be implemented using pathos with minimal changes in the following way:

 1import time
 2from pathos.multiprocessing import ProcessPool  # different import
 3
 4def power_factory(y):
 5    def f(x):
 6        time.sleep(1)
 7        return pow(x, y)
 8    return f
 9
10if __name__ == '__main__':
11    started_at = time.time()
12
13    square = power_factory(2)
14    xs = range(10)
15    with ProcessPool(5) as p:  # different name for Pool
16        results = p.map(square, xs)
17    print(results)
18
19    finished_at = time.time()
20    print(f"Elapsed: {(finished_at - started_at):f}")

Output:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Elapsed: 2.019734

References