为什么在python map()和multiprocessing.Pool.map()中得到不同的答案?
问题内容:
我有一个奇怪的问题。我有一个格式的文件:
START
1
2
STOP
lllllllll
START
3
5
6
STOP
和我想读之间的线START
和STOP
块,而使用my_f
来处理每个块。
def block_generator(file):
with open(file) as lines:
for line in lines:
if line == 'START':
block=itertools.takewhile(lambda x:x!='STOP',lines)
yield block
在我的主要职能中,我试图用来map()
完成工作。有效。
blocks=block_generator(file)
map(my_f,blocks)
会给我我想要的 但是当我尝试与进行相同的操作时
multiprocessing.Pool.map()
,它给了我一个错误,说takewhile()想接受2个参数,给了0。
blocks=block_generator(file)
p=multiprocessing.Pool(4)
p.map(my_f,blocks)
这是一个错误吗?
- 该文件有1000000多个块,每个块少于100行。
- 我接受未填写的答案表格。
- 但是也许我会简单地拆分文件并使用原始脚本的n个实例,而无需进行多处理来处理它们,然后将结果汇总在一起。这样,只要脚本适用于小文件,您就不会出错。
问题答案:
怎么样:
import itertools
def grouper(n, iterable, fillvalue=None):
# Source: http://docs.python.org/library/itertools.html#recipes
"grouper(3, 'ABCDEFG', 'x') --> ABC DEF Gxx"
return itertools.izip_longest(*[iter(iterable)]*n,fillvalue=fillvalue)
def block_generator(file):
with open(file) as lines:
for line in lines:
if line == 'START':
block=list(itertools.takewhile(lambda x:x!='STOP',lines))
yield block
blocks=block_generator(file)
p=multiprocessing.Pool(4)
for chunk in grouper(100,blocks,fillvalue=''):
p.map(my_f,chunk)
使用grouper
会限制占用的文件量p.map
。因此,无需将整个文件立即读入内存(送入任务队列)。
我在上面声明,当您调用时p.map(func,iterator)
,整个迭代器会立即被消耗掉以填充任务队列。然后,池工作人员从队列中获取任务并同时处理作业。
如果查看pool.py并跟踪定义,您将看到_handle_tasks
线程从中获取项目self._taskqueue
,并立即枚举:
for i, task in enumerate(taskseq):
...
put(task)
结论是,传递给的迭代器立即p.map
被消耗。从队列中获取下一个任务之前,无需等待一个任务结束。
作为进一步的佐证,如果运行此命令:
示范代码:
import multiprocessing as mp
import time
import logging
def foo(x):
time.sleep(1)
return x*x
def blocks():
for x in range(1000):
if x%100==0:
logger.info('Got here')
yield x
logger=mp.log_to_stderr(logging.DEBUG)
logger.setLevel(logging.DEBUG)
pool=mp.Pool()
print pool.map(foo, blocks())
您Got here
几乎会立即看到该消息打印了10次,然后由于time.sleep(1)
打进来而暂停了很长时间foo
。这明显表明迭代器在池进程完成任务之前很久就被完全消耗。