Как создать синхронизированный объект с многопроцессорной обработкой Python? - PullRequest
1 голос
/ 17 февраля 2011

Мне трудно понять, как создать синхронизированный объект Python.У меня есть класс с именем Observation и класс с именем Variable, который в основном выглядит (код упрощен, чтобы показать суть):

class Observation:
    def __init__(self, date, time_unit, id, meta):
        self.date = date
        self.time_unit = time_unit
        self.id = id
        self.count = 0
        self.data = 0

    def add(self, value):
        if isinstance(value, list):
            if self.count == 0:
                self.data = []
            self.data.append(value)
        else:
            self.data += value
        self.count += 1


class Variable:
    def __init__(self, name, time_unit, lock):
        self.name = name
        self.lock = lock
        self.obs = {}
        self.time_unit = time_unit

    def get_observation(self, id, date, meta):
        self.lock.acquire()
        try:
            obs = self.obs.get(id, Observation(date, self.time_unit, id, meta))
            self.obs[id] = obs
        finally:
            self.lock.release()
        return obs

    def add(self, date, value, meta={}):
        self.lock.acquire()
        try:
            obs = self.get_observation(id, date, meta)
            obs.add(value)
            self.obs[id] = obs
        finally:
            self.lock.release()

Вот как я настраиваю многопроцессорную часть: plugin = функция, определенная где-то еще задач= JoinableQueue () result = JoinableQueue () mgr = Manager () lock = mgr.RLock () var = Переменная ('foobar', 'year', lock)

for person in persons:
    tasks.put(Task(plugin, var, person))

Пример кодадолжен работать:

У меня есть экземпляр Variable, называемый var, и я хочу добавить наблюдение к var:

today = datetime.datetime.today()  
var.add(today, 1)  

Итак, функция добавления Variable проверяет, существует ли уженаблюдение за этой датой, если это так, то оно возвращает это наблюдение, иначе создает новый экземпляр наблюдения.Обнаружив наблюдение, к которому добавляется фактическое значение, вызывая obs.add (значение).Моя главная проблема заключается в том, что я хочу убедиться, что разные процессы не создают несколько экземпляров Наблюдения за одну и ту же дату, поэтому я его блокирую.

Один экземпляр переменной создается и совместно используется различными процессами, использующими многопроцессорную библиотеку, и является контейнером для многочисленных экземпляров наблюдения.Приведенный выше код не работает, я получаю сообщение об ошибке:

RuntimeError: Блокировка объектов должна быть разделена между процессами только через наследование

Однако, если я создаю экземпляр объекта Lockперед запуском различных процессов и передачей их в конструктор Variable, мне кажется, что я получаю условие состязания, поскольку все процессы, похоже, ждут друг друга.

Конечная цель состоит в том, чтобы различные процессы могли обновлять переменную obs в объекте Variable.Мне нужно, чтобы это было поточно-ориентированным, потому что я не просто изменяю словарь на месте, но добавляю новые элементы и увеличиваю существующие переменные.переменная obs - это словарь, который содержит несколько экземпляров Observation.

Как я могу сделать это синхронизированным, когда я разделяю один единственный экземпляр Переменной между многочисленными многопроцессорными процессами?Большое спасибо за ваш когнитивный избыток!

ОБНОВЛЕНИЕ 1:
* Я использую многопроцессорные блокировки, и я изменил исходный код, чтобы показать это.
* Я изменилназвание для более точного определения проблемы
* Я заменил theadsafe на синхронизацию, где я перепутал два термина.

Спасибо Дмитрию Двойникову за указание на меня!

Один вопрос, в котором я до сих пор не уверен, - где мне создать экземпляр Lock?Должно ли это произойти внутри класса или перед инициализацией многопроцессорных процессов и передачей их в качестве аргумента?ОТВЕТ: Должно происходить вне класса.

ОБНОВЛЕНИЕ 2:
* Я исправил ошибку «Объекты блокировки должны быть разделены между процессами только через наследование» путем перемещения инициализации блокировки.вне определения класса и использования менеджера.
* Последний вопрос, теперь все работает, за исключением того, что кажется, что когда я помещаю свой экземпляр Variable в очередь, он не обновляется, и каждый раз, когда я получаю его из очереди, он делаетне содержит наблюдения, которое я добавил в предыдущей итерации.Это единственное, что меня смущает: (

ОБНОВЛЕНИЕ 3:
Окончательное решение состояло в том, чтобы установить словарь var.obs для экземпляра mgr.dict () изатем иметь собственный сериализатор. Счастлив, что поделитесь кодом с кем-то, кто также борется с этим.

1 Ответ

3 голосов
/ 17 февраля 2011

Вы говорите не о безопасности потоков, а о синхронизации между отдельными процессами, и это совершенно другое.В любом случае, для запуска

различные процессы могут обновить переменную obs в объекте Variable.

подразумевает, что переменная находится в разделяемой памяти, и вы должны явно хранить там объекты, не по волшебству локальный экземпляр становится видимым для отдельного процесса.Здесь:

Данные могут быть сохранены в карте общей памяти с использованием Value или Array

Тогда в вашем фрагменте кода отсутствует важный раздел импорта.Невозможно определить, используете ли вы правильную многопроцессорную блокировку. Не многопоточность. Блокировка.Ваш код не показывает, как вы создаете процессы и передаете данные.

Поэтому я бы посоветовал вам осознать разницу между потоками и процессами, действительно ли вам нужна модель общей памяти для приложения, котороесодержит несколько процессов и проверьте spec .

...