Многопроцессорная обработка файла в Python с последующей записью результата на диск - PullRequest
0 голосов
/ 26 марта 2019

Я хотел бы сделать следующее:

  • чтение данных из файла CSV
  • обработка каждой строки указанного CSV (при условии, что это длинная сетевая операция)
  • записать в другой файл результат

Я попытался склеить этот и этот ответы, но с недостаточным успехом.Код для второй очереди никогда не вызывается, поэтому запись на диск не происходит.Как я могу сообщить процессу, что существует вторая очередь?

Обратите внимание, что мне не нужен поклонник multiprocessing.Если async / await работает лучше, я за это.

Мой код пока

import multiprocessing
import os
import time

in_queue = multiprocessing.Queue()
out_queue = multiprocessing.Queue()

def worker_main(in_queue, out_queue):
    print (os.getpid(), "working")
    while True:
        item = in_queue.get(True)
        print (os.getpid(), "got", item)
        time.sleep(1) #long network processing
        print (os.getpid(), "done", item)
        # put the processed items to be written to disl
        out_queue.put("processed:" + str(item))


pool = multiprocessing.Pool(3, worker_main,(in_queue,out_queue))

for i in range(5): # let's assume this is the file reading part
    in_queue.put(i)

with open('out.txt', 'w') as file:

    while not out_queue.empty():
        try:
            value = q.get(timeout = 1)
            file.write(value + '\n')
        except Exception as qe:
            print ("Empty Queue or dead process")

1 Ответ

2 голосов
/ 26 марта 2019

Первая проблема, с которой я столкнулся при попытке выполнить ваш код:

An attempt has been made to start a new process before the current process has finished 
its bootstrapping phase. This probably means that you are not using fork to start your 
child processes and you have forgotten to use the proper idiom in the main module

Я должен был обернуть любые инструкции области видимости модуля в идиому if __name__ == '__main__':. Подробнее здесь .

Поскольку ваша цель состоит в том, чтобы перебирать строки файла, Pool.imap() кажется подходящим вариантом. Документы imap() относятся к документам map(), с той разницей, что imap() лениво извлекает следующие элементы из итерируемого (который в вашем случае будет файл CSV), что будет полезно, если ваш файл CSV большой. Так из map() документов:

Этот метод разбивает итерируемое на несколько кусков, которые он отправляется в пул процессов как отдельные задачи.

imap() возвращает итератор, так что вы можете перебирать результаты, полученные рабочими процесса, чтобы делать то, что вы должны с ними делать (в нашем примере это запись результатов в файл).

Вот рабочий пример:

import multiprocessing
import os
import time


def worker_main(item):
    print(os.getpid(), "got", item)
    time.sleep(1) #long network processing
    print(os.getpid(), "done", item)
    # put the processed items to be written to disl
    return "processed:" + str(item)


if __name__ == '__main__':
    with multiprocessing.Pool(3) as pool:
        with open('out.txt', 'w') as file:
            # range(5) simulating a 5 row csv file.
            for proc_row in pool.imap(worker_main, range(5)):
                file.write(proc_row + '\n')

# printed output:
# 1368 got 0
# 9228 got 1
# 12632 got 2
# 1368 done 0
# 1368 got 3
# 9228 done 1
# 9228 got 4
# 12632 done 2
# 1368 done 3
# 9228 done 4

out.txt выглядит так:

processed:0
processed:1
processed:2
processed:3
processed:4

Обратите внимание, что мне также не приходилось использовать какие-либо очереди.

...