Пожалуйста, обратите внимание, что этот демонстрационный код генерирует данные в несколько ГБ.
В течение некоторого времени я использовал версии приведенного ниже кода для многопроцессорной обработки.Это работает хорошо, когда время выполнения каждого процесса в пуле одинаково, но если один процесс занимает намного больше времени, я получаю много заблокированных процессов, ожидающих одного, поэтому я пытаюсь заставить его работать асинхронно - только для одной функции ввремя.
Например, если у меня 70 ядер и мне нужно 2000 раз запустить функцию, я хочу, чтобы она выполнялась асинхронно, а затем дождаться последнего процесса, прежде чем вызывать следующую функцию.В настоящее время он просто отправляет процессы партиями того количества ядер, которое я ему предоставляю, и каждый пакет должен ждать самого длинного процесса.
Как вы можете видеть, я пытался использовать map_async, но это явно неправильный синтаксис.Кто-нибудь может мне помочь?
import os
p='PATH/test/'
def f1(tup):
x,y=tup
to_write = x*(y**5)
with open(p+x+str(y)+'.txt','w') as fout:
fout.write(to_write)
def f2(tup):
x,y=tup
print (os.path.exists(p+x+str(y)+'.txt'))
def call_func(f,nos,threads,call):
print (call)
for i in range(0, len(nos), threads):
print (i)
chunk = nos[i:i + threads]
tmp = [('args', no) for no in chunk]
pool.map(f, tmp)
#pool.map_async(f, tmp)
nos=[i for i in range(55)]
threads=8
if __name__ == '__main__':
with Pool(processes=threads) as pool:
call_func(f1,nos,threads,'f1')
call_func(f2,nos,threads,'f2')