Многопроцессорная обработка Python и общий счетчик - PullRequest
48 голосов
/ 17 января 2010

У меня проблемы с многопроцессорным модулем. Я использую пул рабочих с его методом map для загрузки данных из большого количества файлов, и для каждого из них я анализирую данные с помощью специальной функции. Каждый раз, когда файл обрабатывается, я хочу обновить счетчик, чтобы отслеживать, сколько файлов осталось обработать. Вот пример кода:

def analyze_data( args ):
    # do something 
    counter += 1
    print counter


if __name__ == '__main__':

    list_of_files = os.listdir(some_directory)

    global counter
    counter = 0

    p = Pool()
    p.map(analyze_data, list_of_files)

Я не могу найти решение для этого.

Ответы [ 3 ]

55 голосов
/ 17 января 2010

Проблема в том, что переменная counter не используется всеми вашими процессами: каждый отдельный процесс создает собственный локальный экземпляр и увеличивает его.

См. в этом разделе документации по некоторым методам, которые вы можете использовать для обмена состоянием между вашими процессами. В вашем случае вы можете поделиться экземпляром Value между вашими работниками

Вот рабочая версия вашего примера (с некоторыми фиктивными входными данными). Обратите внимание, что он использует глобальные значения, которых я бы на практике старался избегать:

from multiprocessing import Pool, Value
from time import sleep

counter = None

def init(args):
    ''' store the counter for later use '''
    global counter
    counter = args

def analyze_data(args):
    ''' increment the global counter, do something with the input '''
    global counter
    # += operation is not atomic, so we need to get a lock:
    with counter.get_lock():
        counter.value += 1
    print counter.value
    return args * 10

if __name__ == '__main__':
    #inputs = os.listdir(some_directory)

    #
    # initialize a cross-process counter and the input lists
    #
    counter = Value('i', 0)
    inputs = [1, 2, 3, 4]

    #
    # create the pool of workers, ensuring each one receives the counter 
    # as it starts. 
    #
    p = Pool(initializer = init, initargs = (counter, ))
    i = p.map_async(analyze_data, inputs, chunksize = 1)
    i.wait()
    print i.get()
29 голосов
/ 10 февраля 2014

Класс счетчика без ошибки состояния гонки:

class Counter(object):
    def __init__(self):
        self.val = multiprocessing.Value('i', 0)

    def increment(self, n=1):
        with self.val.get_lock():
            self.val.value += n

    @property
    def value(self):
        return self.val.value
2 голосов
/ 30 ноября 2017

Класс Faster Counter без использования дважды встроенной блокировки Value

class Counter(object):
    def __init__(self, initval=0):
        self.val = multiprocessing.RawValue('i', initval)
        self.lock = multiprocessing.Lock()

    def increment(self):
        with self.lock:
            self.val.value += 1

    @property
    def value(self):
        return self.val.value

https://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing https://docs.python.org/2/library/multiprocessing.html#multiprocessing.sharedctypes.Value https://docs.python.org/2/library/multiprocessing.html#multiprocessing.sharedctypes.RawValue

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...