Избегайте глобальных переменных для необратимого общего состояния среди многопроцессорных. Работники пула - PullRequest
0 голосов
/ 02 октября 2018

Я часто пишу программы на Python, которые создают большую (мегабайтную) структуру данных только для чтения, а затем используют эту структуру данных для анализа очень большого (всего сотни мегабайт) списка небольших записей.Каждая из записей может быть проанализирована параллельно, поэтому естественным способом является установка структуры данных только для чтения и присвоение ее глобальной переменной, а затем создание multiprocessing.Pool (которое неявно копирует данныеструктурировать в каждый рабочий процесс через fork), а затем использовать imap_unordered для параллельного сжатия записей.Скелет этого шаблона имеет тенденцию выглядеть следующим образом:

classifier = None
def classify_row(row):
    return classifier.classify(row)

def classify(classifier_spec, data_file):
    global classifier
    try:
        classifier = Classifier(classifier_spec)
        with open(data_file, "rt") as fp, \
             multiprocessing.Pool() as pool:
            rd = csv.DictReader(fp)
            yield from pool.imap_unordered(classify_row, rd)
    finally:
        classifier = None

Я не доволен этим из-за глобальной переменной и неявной связи между classify и classify_row.В идеале я хотел бы написать

def classify(classifier_spec, data_file):
    classifier = Classifier(classifier_spec)
    with open(data_file, "rt") as fp, \
         multiprocessing.Pool() as pool:
        rd = csv.DictReader(fp)
        yield from pool.imap_unordered(classifier.classify, rd)

, но это не работает, потому что объект Classifier обычно содержит объекты, которые не могут быть обработаны (потому что они определены модулями расширения, авторам которых это не важно);Я также читал, что это будет очень медленно, если он будет работать, потому что объект Classifier будет скопирован в рабочие процессы при каждом вызове связанного метода.

Есть ли лучшеальтернатива?Я забочусь только о 3.x.

Ответы [ 3 ]

0 голосов
/ 07 октября 2018

Модуль multiprocessing.sharedctypes предоставляет функции для выделения объектов ctypes из общей памяти, которые могут наследоваться дочерними процессами, т. Е. Родительские и дочерние объекты могут получить доступ к общей памяти.

Вы можетеиспользуйте
1. multiprocessing.sharedctypes.RawArray для выделения массива ctypes из общей памяти.
2. multiprocessing.sharedctypes.RawValue для выделения объекта ctypes из общей памяти.

Доктор Мианжи Ван написал очень подробный документ об этом.Вы можете поделиться несколькими multiprocessing.sharedctypes объектами.

Вы можете найти решение здесь полезным для вас.

0 голосов
/ 10 октября 2018

Если вы хотите использовать разветвление, я не вижу пути использования глобального.Но я также не вижу причины, по которой вам пришлось бы чувствовать себя плохо из-за использования глобального в этом случае, вы не управляете глобальным списком с помощью многопоточности или около того.

С этим можно справитьсяхотя уродство в вашем примере.Вы хотите передать classifier.classify напрямую, но объект Classifier содержит объекты, которые невозможно протравить.

import os
import csv
import uuid
from threading import Lock
from multiprocessing import Pool
from weakref import WeakValueDictionary

class Classifier:

    def __init__(self, spec):
        self.lock = Lock()  # unpickleable
        self.spec = spec

    def classify(self, row):
        return f'classified by pid: {os.getpid()} with spec: {self.spec}', row

Я предлагаю подкласс Classifier и определить __getstate__ и __setstate__ для включения травления.Так как вы в любом случае используете разветвление, все, что нужно, это информация о том, как получить ссылку на разветвленный глобальный экземпляр.Затем мы просто обновим __dict__ протравленного объекта на __dict__ разветвленного экземпляра (который не подвергся уменьшению травления), и ваш экземпляр снова будет завершен.

Для этого безДополнительный шаблонный экземпляр Classifier должен создать для себя имя и зарегистрировать его как глобальную переменную.Эта первая ссылка будет слабой ссылкой, поэтому экземпляр можно будет собирать, когда пользователь этого ожидает.Вторая ссылка создается пользователем, когда он назначает classifier = Classifier(classifier_spec).Этот не обязательно должен быть глобальным.

Сгенерированное имя в примере ниже генерируется с помощью модуля uuid standard-lib.Uuid преобразуется в строку и редактируется в допустимый идентификатор (это не должно быть, но это удобно для отладки в интерактивном режиме).

class SubClassifier(Classifier):

    def __init__(self, spec):
        super().__init__(spec)
        self.uuid = self._generate_uuid_string()
        self.pid = os.getpid()
        self._register_global()

    def __getstate__(self):
        """Define pickled content."""
        return {'uuid': self.uuid}

    def __setstate__(self, state):
        """Set state in child process."""
        self.__dict__ = state
        self.__dict__.update(self._get_instance().__dict__)

    def _get_instance(self):
        """Get reference to instance."""
        return globals()[self.uuid][self.uuid]

    @staticmethod
    def _generate_uuid_string():
        """Generate id as valid identifier."""
        # return 'uuid_' + '123' # for testing
        return 'uuid_' + str(uuid.uuid4()).replace('-', '_')

    def _register_global(self):
        """Register global reference to instance."""
        weakd = WeakValueDictionary({self.uuid: self})
        globals().update({self.uuid: weakd})

    def __del__(self):
        """Clean up globals when deleted in parent."""
        if os.getpid() == self.pid:
            globals().pop(self.uuid)

Самое приятное здесь то, что шаблонполностью ушелВам не нужно связываться вручную с объявлением и удалением глобальных переменных, поскольку экземпляр управляет всем сам в фоновом режиме:

def classify(classifier_spec, data_file, n_workers):
    classifier = SubClassifier(classifier_spec)
    # assert globals()['uuid_123']['uuid_123'] # for testing
    with open(data_file, "rt") as fh, Pool(n_workers) as pool:
        rd = csv.DictReader(fh)
        yield from pool.imap_unordered(classifier.classify, rd)


if __name__ == '__main__':

    PATHFILE = 'data.csv'
    N_WORKERS = 4

    g = classify(classifier_spec='spec1', data_file=PATHFILE, n_workers=N_WORKERS)
    for record in g:
        print(record)

   # assert 'uuid_123' not in globals() # no reference left
0 голосов
/ 07 октября 2018

Это было удивительно сложно.Ключевым моментом здесь является сохранение доступа для чтения к переменным, которые доступны во время разветвления без сериализации.Большинство решений для разделения памяти в многопроцессорной среде заканчиваются сериализацией.Я пытался использовать weakref.proxy для передачи классификатора без сериализации, но это не сработало, потому что и укроп, и соленый огурец будут пытаться следовать и сериализовать референт. Однако , модуль-ссылка работает.

Эта организация приближает нас:

import multiprocessing as mp
import csv


def classify(classifier, data_file):

    with open(data_file, "rt") as fp, mp.Pool() as pool:
        rd = csv.DictReader(fp)
        yield from pool.imap_unordered(classifier.classify, rd)


def orchestrate(classifier_spec, data_file):
    # construct a classifier from the spec; note that we can
    # even dynamically import modules here, using config values
    # from the spec
    import classifier_module
    classifier_module.init(classifier_spec)
    return classify(classifier_module, data_file)


if __name__ == '__main__':
    list(orchestrate(None, 'data.txt'))

Несколько изменений, чтобы отметить здесь:

  • мы добавили orchestrate метод для некоторого совершенства DI;оркестровка выясняет, как построить / инициализировать классификатор, и передает его classify, развязывая два
  • classify, нужно только предположить, что параметр classifier имеет метод classify;ему все равно, является ли он экземпляром или модулем

Для этого доказательства концепции мы предоставляем классификатор, который явно не сериализуем:

# classifier_module.py
def _create_classifier(spec):

    # obviously not pickle-able because it's inside a function
    class Classifier():

        def __init__(self, spec):
            pass

        def classify(self, x):
            print(x)
            return x

    return Classifier(spec)


def init(spec):
    global __classifier
    __classifier = _create_classifier(spec)


def classify(x):
    return __classifier.classify(x)

К сожалению, все еще естьздесь глобальный, но теперь он приятно инкапсулирован внутри модуля как частная переменная, и модуль экспортирует плотный интерфейс, состоящий из функций classify и init.

Эта конструкция открывает некоторые возможности:

  • orchestrate может импортировать и инициировать различные модули классификатора, основываясь на том, что он видит в classifier_spec
  • , можно также передать экземпляр некоторого класса Classifier в classifyДо тех пор, пока этот экземпляр является сериализуемым и имеет метод классификации той же сигнатуры
...