Первая проблема, с которой я столкнулся при попытке выполнить ваш код:
An attempt has been made to start a new process before the current process has finished
its bootstrapping phase. This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom in the main module
Я должен был обернуть любые инструкции области видимости модуля в идиому if __name__ == '__main__':
. Подробнее здесь .
Поскольку ваша цель состоит в том, чтобы перебирать строки файла, Pool.imap()
кажется подходящим вариантом. Документы imap()
относятся к документам map()
, с той разницей, что imap()
лениво извлекает следующие элементы из итерируемого (который в вашем случае будет файл CSV), что будет полезно, если ваш файл CSV большой. Так из map()
документов:
Этот метод разбивает итерируемое на несколько кусков, которые он
отправляется в пул процессов как отдельные задачи.
imap()
возвращает итератор, так что вы можете перебирать результаты, полученные рабочими процесса, чтобы делать то, что вы должны с ними делать (в нашем примере это запись результатов в файл).
Вот рабочий пример:
import multiprocessing
import os
import time
def worker_main(item):
print(os.getpid(), "got", item)
time.sleep(1) #long network processing
print(os.getpid(), "done", item)
# put the processed items to be written to disl
return "processed:" + str(item)
if __name__ == '__main__':
with multiprocessing.Pool(3) as pool:
with open('out.txt', 'w') as file:
# range(5) simulating a 5 row csv file.
for proc_row in pool.imap(worker_main, range(5)):
file.write(proc_row + '\n')
# printed output:
# 1368 got 0
# 9228 got 1
# 12632 got 2
# 1368 done 0
# 1368 got 3
# 9228 done 1
# 9228 got 4
# 12632 done 2
# 1368 done 3
# 9228 done 4
out.txt
выглядит так:
processed:0
processed:1
processed:2
processed:3
processed:4
Обратите внимание, что мне также не приходилось использовать какие-либо очереди.