Python中的进程池(Pool)与全局变量

Python有两大类并行方式:多线程与多进程。由于GIL的存在,这两种方式有着截然不同的特点:

  • 多线程可以直接共享数据,但至多只能用一个CPU核
  • 多进程可以用多个CPU核,但各进程的数据相互独立(可shared_memory等机制共享数据)

multiprocessing包中的Pool类是实现多进程的常用工具。从直觉上看,以下代码中,由于每个进程都具有独立的内存,对i进行修改不会有任何效果,所有打印出来的i应该都是1

1
2
3
4
5
6
7
8
9
10
11
12
13
import os
from multiprocessing import Pool

i = 0

def pp(job_id):
global i
i = i + 1
print(f"[{job_id:02}]: {i}, from {os.getpid()}")

if __name__ == "__main__":
with Pool(4) as p:
p.map(pp, range(9))

然而,实际运行这段代码,i可以一直打印到10!这是什么原因呢?

以下是上述程序的输出:

1
2
3
4
5
6
7
8
9
[00]: 1, from 23600
[01]: 2, from 23600
[02]: 3, from 23600
[03]: 4, from 23600
[04]: 5, from 23600
[05]: 6, from 23600
[06]: 7, from 23600
[07]: 8, from 23600
[08]: 9, from 23600

可以看到,与预期的至少四个进程执行各个pp函数不同,所有的pp函数都是一个进程执行的。如果要达到预想的由不同的进程执行不同的pp函数,所有打印出的i都是1的效果,实际上需要做两点改动:

  • 设置Pool实例的maxtasksperchild=1
  • 设置map函数的参数chunksize=1
    也就说main部分应该这样写:
    1
    2
    3
    if __name__ == "__main__":
    with Pool(4, maxtasksperchild=1) as p:
    p.map(pp, range(9), chunksize=1)

什么是maxtasksperchild

关于maxtasksperchild官方文档专门有一个Note:

通常来说,Pool 中的 Worker 进程的生命周期和进程池的工作队列一样长。一些其他系统中(如 Apache, mod_wsgi 等)也可以发现另一种模式,他们会让工作进程在完成一些任务后退出,清理、释放资源,然后启动一个新的进程代替旧的工作进程。 Pool 的 maxtasksperchild 参数给用户提供了这种能力。

简单来说,就是Pool中每个进程并不是完成一个任务之后就退出的,而是完成一个任务后,不退出Python,继续完成下一个任务。这样,不同的任务之间就会通过Python全局变量共享一部分数据。设置maxtasksperchild=1的效果,就是让每个进程完成任务之后强行退出,然后另起一个进程完成下一个任务。这样不同的任务之间就不会再共享数据了。

什么是chunksize

另一个chunksize参数相对来说比较常见,即上文中的“任务”所包含的可迭代对象数。也就是说,每个进程可以完成的“任务”单元,可能也包含了多个可迭代对象。而由于执行同一个任务的自然是同一个进程,不同的可迭代对象之间也就可以通过全局变量共享数据。设置chunksize大于1的主要目的是减少不必要的进程创建,提高效率。当chucksize没有指定时,Python会根据进程任务量的大小自动设置该参数,以最大化效率。

一个栗子

当设置maxtasksperchild=2chunksize=3时,示例程序的一次输出如下:

1
2
3
4
5
6
7
8
9
[00]: 1, from 20508
[01]: 2, from 20508
[02]: 3, from 20508
[03]: 1, from 15780
[04]: 2, from 15780
[05]: 3, from 15780
[06]: 4, from 20508
[07]: 5, from 20508
[08]: 6, from 20508

可以看到,Pool中起了两个进程2050815780来执行任务,每个进程一次处理3个数字。而20508进程在完成[0, 1, 2]的迭代之后,转头又去迭代[6, 7, 8]去了。

Lessons

  • 使用Pool时,牢记各个“进程”之间其实并不独立
  • 应尽可能保持全局变量是常值