Запись в файл с многопроцессорной обработкой - PullRequest
11 голосов
/ 29 июня 2011

У меня возникла следующая проблема в Python.

Мне нужно сделать несколько расчетов параллельно, результаты которых мне нужно записать последовательно в файл.Поэтому я создал функцию, которая получает multiprocessing.Queue и дескриптор файла, выполняет вычисления и печатает результат в файле:

import multiprocessing
from multiprocessing import Process, Queue
from mySimulation import doCalculation   

# doCalculation(pars) is a function I must run for many different sets of parameters and collect the results in a file

def work(queue, fh):
while True:
    try:
        parameter = queue.get(block = False)
        result = doCalculation(parameter) 
        print >>fh, string
    except:
        break


if __name__ == "__main__":
    nthreads = multiprocessing.cpu_count()
    fh = open("foo", "w")
    workQueue = Queue()
    parList = # list of conditions for which I want to run doCalculation()
    for x in parList:
        workQueue.put(x)
    processes = [Process(target = writefh, args = (workQueue, fh)) for i in range(nthreads)]
    for p in processes:
       p.start()
    for p in processes:
       p.join()
    fh.close()

Но файл заканчивается пустым после запуска сценария.Я попытался изменить функцию worker () на:

def work(queue, filename):
while True:
    try:
        fh = open(filename, "a")
        parameter = queue.get(block = False)
        result = doCalculation(parameter) 
        print >>fh, string
        fh.close()
    except:
        break

и передать имя файла в качестве параметра.Тогда это работает, как я и предполагал.Когда я пытаюсь сделать то же самое последовательно, без многопроцессорной обработки, это также работает нормально.

Почему это не сработало в первой версии?Я не вижу проблемы.

Также: могу ли я гарантировать, что два процесса не будут пытаться записать файл одновременно?


РЕДАКТИРОВАТЬ:

Спасибо.Я понял.Это рабочая версия:

import multiprocessing
from multiprocessing import Process, Queue
from time import sleep
from random import uniform

def doCalculation(par):
    t = uniform(0,2)
    sleep(t)
    return par * par  # just to simulate some calculation

def feed(queue, parlist):
    for par in parlist:
            queue.put(par)

def calc(queueIn, queueOut):
    while True:
        try:
            par = queueIn.get(block = False)
            print "dealing with ", par, "" 
            res = doCalculation(par)
            queueOut.put((par,res))
        except:
            break

def write(queue, fname):
    fhandle = open(fname, "w")
    while True:
        try:
            par, res = queue.get(block = False)
            print >>fhandle, par, res
        except:
            break
    fhandle.close()

if __name__ == "__main__":
    nthreads = multiprocessing.cpu_count()
    fname = "foo"
    workerQueue = Queue()
    writerQueue = Queue()
    parlist = [1,2,3,4,5,6,7,8,9,10]
    feedProc = Process(target = feed , args = (workerQueue, parlist))
    calcProc = [Process(target = calc , args = (workerQueue, writerQueue)) for i in range(nthreads)]
    writProc = Process(target = write, args = (writerQueue, fname))


    feedProc.start()
    for p in calcProc:
        p.start()
    writProc.start()

    feedProc.join ()
    for p in calcProc:
        p.join()
    writProc.join ()

Ответы [ 3 ]

16 голосов
/ 29 июня 2011

Вы действительно должны использовать две очереди и три отдельных вида обработки.

  1. Поместить материал в очередь № 1.

  2. Получить вещи из очереди # 1 и делать вычисления, помещая вещи в очередь # 2. Их может быть много, поскольку они попадают из одной очереди и безопасно помещаются в другую.

  3. Получите материал из очереди # 2 и запишите его в файл. Вы должны иметь ровно 1 из них и не более. Он «владеет» файлом, гарантирует атомарный доступ и абсолютно гарантирует, что файл записан чисто и согласованно.

5 голосов
/ 15 апреля 2015

Если кто-то ищет простой способ сделать то же самое, это может вам помочь.Я не думаю, что есть какие-то недостатки, чтобы сделать это таким образом.Если есть, пожалуйста, сообщите мне.

import multiprocessing 
import re

def mp_worker(item):
    # Do something
    return item, count

def mp_handler():
    cpus = multiprocessing.cpu_count()
    p = multiprocessing.Pool(cpus)
    # The below 2 lines populate the list. This listX will later be accessed parallely. This can be replaced as long as listX is passed on to the next step.
    with open('ExampleFile.txt') as f:
        listX = [line for line in (l.strip() for l in f) if line]
    with open('results.txt', 'w') as f:
        for result in p.imap(mp_worker, listX):
            # (item, count) tuples from worker
            f.write('%s: %d\n' % result)

if __name__=='__main__':
    mp_handler()

Источник: Python: запись в один файл с очередью при использовании многопроцессорного пула

0 голосов
/ 27 февраля 2017

В коде записи работника есть ошибка: если блок ложен, работник никогда не получит никаких данных.Должно быть следующим образом:

par, res = queue.get(block = True)

Вы можете проверить это, добавив строку

 print "QSize",queueOut.qsize()

после queueOut.put((par,res))

С блоком = False вы получите когда-либоувеличивая длину очереди, пока она не заполнится, в отличие от block = True, где вы всегда получаете «1».

...