Многопроцессорный пул map_async для одной функции, затем блок перед следующей (python 3) - PullRequest
0 голосов
/ 15 сентября 2018

Пожалуйста, обратите внимание, что этот демонстрационный код генерирует данные в несколько ГБ.

В течение некоторого времени я использовал версии приведенного ниже кода для многопроцессорной обработки.Это работает хорошо, когда время выполнения каждого процесса в пуле одинаково, но если один процесс занимает намного больше времени, я получаю много заблокированных процессов, ожидающих одного, поэтому я пытаюсь заставить его работать асинхронно - только для одной функции ввремя.

Например, если у меня 70 ядер и мне нужно 2000 раз запустить функцию, я хочу, чтобы она выполнялась асинхронно, а затем дождаться последнего процесса, прежде чем вызывать следующую функцию.В настоящее время он просто отправляет процессы партиями того количества ядер, которое я ему предоставляю, и каждый пакет должен ждать самого длинного процесса.

Как вы можете видеть, я пытался использовать map_async, но это явно неправильный синтаксис.Кто-нибудь может мне помочь?

import os
p='PATH/test/'

def f1(tup):
    x,y=tup
    to_write = x*(y**5)
    with open(p+x+str(y)+'.txt','w') as fout:
        fout.write(to_write)

def f2(tup):
    x,y=tup
    print (os.path.exists(p+x+str(y)+'.txt'))

def call_func(f,nos,threads,call):
    print (call)
    for i in range(0, len(nos), threads):
        print (i)
        chunk = nos[i:i + threads]
        tmp = [('args', no) for no in chunk]
        pool.map(f, tmp)
        #pool.map_async(f, tmp)

nos=[i for i in range(55)]
threads=8
if __name__ == '__main__':
with Pool(processes=threads) as pool:
    call_func(f1,nos,threads,'f1')
    call_func(f2,nos,threads,'f2')

1 Ответ

0 голосов
/ 15 сентября 2018

map будет возвращаться только, а map_async будет вызывать обратный вызов только после выполнения всех задач текущего блока.

Таким образом, вы можете либо передать все задачи только на map / map_async сразу, либо использовать apply_async (первоначально называемый threads раз), где callback вызывает apply_async для следующей задачи.

Если фактические возвращаемые значения вызова не имеют значения (или, по крайней мере, их порядок не имеет значения), imap_unordered может быть другим эффективным решением, когда он выполняет все задачи одновременно (или итератор / генератор, создающий задачи). по запросу)

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...