为什么在python map()和multiprocessing.Pool.map()中得到不同的答案?


问题内容

我有一个奇怪的问题。我有一个格式的文件:

START
1
2
STOP
lllllllll
START
3
5
6
STOP

和我想读之间的线STARTSTOP块,而使用my_f来处理每个块。

def block_generator(file):

with open(file) as lines:
    for line in lines:
        if line == 'START': 
            block=itertools.takewhile(lambda x:x!='STOP',lines) 
            yield block

在我的主要职能中,我试图用来map()完成工作。有效。

blocks=block_generator(file)
map(my_f,blocks)

会给我我想要的 但是当我尝试与进行相同的操作时
multiprocessing.Pool.map(),它给了我一个错误,说takewhile()想接受2个参数,给了0。

    blocks=block_generator(file)
    p=multiprocessing.Pool(4) 
    p.map(my_f,blocks)

这是一个错误吗?

  1. 该文件有1000000多个块,每个块少于100行。
  2. 我接受未填写的答案表格。
  3. 但是也许我会简单地拆分文件并使用原始脚本的n个实例,而无需进行多处理来处理它们,然后将结果汇总在一起。这样,只要脚本适用于小文件,您就不会出错。

问题答案:

怎么样:

import itertools

def grouper(n, iterable, fillvalue=None):
    # Source: http://docs.python.org/library/itertools.html#recipes
    "grouper(3, 'ABCDEFG', 'x') --> ABC DEF Gxx"
    return itertools.izip_longest(*[iter(iterable)]*n,fillvalue=fillvalue)

def block_generator(file):
    with open(file) as lines:
        for line in lines:
            if line == 'START': 
                block=list(itertools.takewhile(lambda x:x!='STOP',lines))
                yield block

blocks=block_generator(file)
p=multiprocessing.Pool(4)
for chunk in grouper(100,blocks,fillvalue=''):
    p.map(my_f,chunk)

使用grouper会限制占用的文件量p.map。因此,无需将整个文件立即读入内存(送入任务队列)。


我在上面声明,当您调用时p.map(func,iterator),整个迭代器会立即被消耗掉以填充任务队列。然后,池工作人员从队列中获取任务并同时处理作业。

如果查看pool.py并跟踪定义,您将看到_handle_tasks线程从中获取项目self._taskqueue,并立即枚举:

         for i, task in enumerate(taskseq):
             ...
             put(task)

结论是,传递给的迭代器立即p.map被消耗。从队列中获取下一个任务之前,无需等待一个任务结束。

作为进一步的佐证,如果运行此命令:

示范代码:

import multiprocessing as mp
import time
import logging

def foo(x):
    time.sleep(1)
    return x*x

def blocks():
    for x in range(1000):
        if x%100==0:
            logger.info('Got here')
        yield x

logger=mp.log_to_stderr(logging.DEBUG)
logger.setLevel(logging.DEBUG) 
pool=mp.Pool() 
print pool.map(foo, blocks())

Got here几乎会立即看到该消息打印了10次,然后由于time.sleep(1)打进来而暂停了很长时间foo。这明显表明迭代器在池进程完成任务之前很久就被完全消耗。