Совместное использование сложного объекта между процессами Python? - PullRequest
34 голосов
/ 09 сентября 2010

У меня довольно сложный объект Python, который мне нужно разделить между несколькими процессами. Я запускаю эти процессы, используя multiprocessing.Process. Когда я делю объект с multiprocessing.Queue и multiprocessing.Pipe, они делятся просто отлично. Но когда я пытаюсь поделиться объектом с другими объектами, не являющимися мультипроцессорными модулями, кажется, что Python разветвляет эти объекты. Это правда?

Я пытался использовать многопроцессорность. Значение. Но я не уверен, какой тип должен быть? Мой объектный класс называется MyClass. Но когда я пытаюсь multiprocess.Value(MyClass, instance), он терпит неудачу с:

TypeError: this type has no size

Есть идеи, что происходит?

Ответы [ 6 ]

27 голосов
/ 13 сентября 2010

Вы можете сделать это, используя многопроцессорные классы Python «Manager» и прокси-класс, который вы определяете.Из документов Python: http://docs.python.org/library/multiprocessing.html#proxy-objects

То, что вы хотите сделать, это определить прокси-класс для вашего пользовательского объекта, а затем поделиться объектом с помощью «Удаленного менеджера» - посмотрите на примеры в том же связанномСтраница документа для "удаленного менеджера", где документы показывают, как совместно использовать удаленную очередь.Вы будете делать то же самое, но ваш вызов your_manager_instance.register () включит ваш собственный прокси-класс в свой список аргументов.

Таким образом, вы настраиваете сервер для совместного использования пользовательского объекта с пользовательским прокси.Вашим клиентам необходим доступ к серверу (опять же, посмотрите отличные примеры документации о том, как настроить клиент / серверный доступ к удаленной очереди, но вместо того, чтобы делиться очередью, вы делитесь доступом к вашему определенному классу).

23 голосов
/ 28 сентября 2016

После долгих исследований и испытаний я обнаружил, что «Менеджер» выполняет эту работу на не сложном уровне объекта.

Приведенный ниже код показывает, что объект inst совместно используетсяпроцессы, что означает, что свойство var из inst изменяется снаружи, когда дочерний процесс изменяет его.

from multiprocessing import Process, Manager
from multiprocessing.managers import BaseManager

class SimpleClass(object):
    def __init__(self):
        self.var = 0

    def set(self, value):
        self.var = value

    def get(self):
        return self.var


def change_obj_value(obj):
    obj.set(100)


if __name__ == '__main__':
    BaseManager.register('SimpleClass', SimpleClass)
    manager = BaseManager()
    manager.start()
    inst = manager.SimpleClass()

    p = Process(target=change_obj_value, args=[inst])
    p.start()
    p.join()

    print inst                    # <__main__.SimpleClass object at 0x10cf82350>
    print inst.get()              # 100

Хорошо, приведенный выше код достаточно , если вам нужно только поделиться простые объекты .

Почему нет комплекса?Поскольку может произойти сбой , если ваш объект является вложенным (объект внутри объекта):

from multiprocessing import Process, Manager
from multiprocessing.managers import BaseManager

class GetSetter(object):
    def __init__(self):
        self.var = None

    def set(self, value):
        self.var = value

    def get(self):
        return self.var


class ChildClass(GetSetter):
    pass

class ParentClass(GetSetter):
    def __init__(self):
        self.child = ChildClass()
        GetSetter.__init__(self)

    def getChild(self):
        return self.child


def change_obj_value(obj):
    obj.set(100)
    obj.getChild().set(100)


if __name__ == '__main__':
    BaseManager.register('ParentClass', ParentClass)
    manager = BaseManager()
    manager.start()
    inst2 = manager.ParentClass()

    p2 = Process(target=change_obj_value, args=[inst2])
    p2.start()
    p2.join()

    print inst2                    # <__main__.ParentClass object at 0x10cf82350>
    print inst2.getChild()         # <__main__.ChildClass object at 0x10cf6dc50>
    print inst2.get()              # 100
    #good!

    print inst2.getChild().get()   # None
    #bad! you need to register child class too but there's almost no way to do it
    #even if you did register child class, you may get PicklingError :)

Я думаю, что основная причина такого поведения заключается в том, что Manager - это просто моноблок на вершинеиз низкоуровневых коммуникационных инструментов, таких как труба / очередь.

Итак, этот подход не хорошо рекомендуется для многопроцессорной обработки.Всегда лучше, если вы можете использовать низкоуровневые инструменты, такие как lock / semaphore / pipe / queue или высокоуровневые инструменты, такие как Redis queue или Redis, опубликовать / подписаться для сложных случаев использования (только моя рекомендация LOL).

5 голосов
/ 13 ноября 2017

вот пакет Python, который я сделал только для этого (совместное использование сложных объектов между процессами).

git: https://github.com/dRoje/pipe-proxy

Идея в том, что вы создаете прокси для своего объекта и передаете егок процессу.Затем вы используете прокси, как у вас есть ссылка на исходный объект.Хотя вы можете использовать только вызовы методов, поэтому доступ к переменным объекта осуществляется через методы установки и получения.

Скажем, у нас есть объект с именем 'example', создание прокси и прослушивателя прокси легко:

from pipeproxy import proxy 
example = Example() 
exampleProxy, exampleProxyListener = proxy.createProxy(example) 

Теперь вы отправляете прокси другому процессу.

p = Process(target=someMethod, args=(exampleProxy,)) p.start()

Используйте его в другом процессе, как если бы вы использовали исходный объект (пример):

def someMethod(exampleProxy):
    ...
    exampleProxy.originalExampleMethod()
    ...

Но вы должны слушать его в основном процессе:

exampleProxyListener.listen()

Читатьбольше и найти примеры здесь:

http://matkodjipalo.com/index.php/2017/11/12/proxy-solution-python-multiprocessing/

2 голосов
/ 06 июня 2018

Я попытался использовать BaseManager и зарегистрировать свой настроенный класс, чтобы сделать его счастливым, и получить проблему с вложенным классом, как Том уже упоминал выше.

Я думаю, что основная причина не имеет отношения к вложенному классу, как было сказано, но механизм коммуникации, который питон использует на низком уровне. Причина в том, что Python использует некоторый механизм обмена данными, подобный сокету, для синхронизации изменения настроенного класса в процессе сервера на низком уровне. Я думаю, что он инкапсулирует некоторые методы rpc, делает его просто прозрачным для пользователя, как если бы он вызывал локальные методы объекта вложенного класса.

Таким образом, когда вы хотите изменить, получить свои собственные объекты или некоторые сторонние объекты, вы должны определить некоторые интерфейсы в своих процессах для связи с ним, а не напрямую получать или устанавливать значения.

Тем не менее, при работе с вложенными объектами во вложенных объектах, можно игнорировать проблемы, упомянутые выше, так же, как вы делаете это в своей обычной подпрограмме, потому что ваши вложенные объекты в зарегистрированном классе больше не являются объектами прокси который операция не будет проходить через процедуру сокет-подобный снова и локализован.

Вот рабочий код, который я написал для решения проблемы.

from multiprocessing import Process, Manager, Lock
from multiprocessing.managers import BaseManager
import numpy as np

class NestedObj(object):
       def __init__(self):
                self.val = 1

class CustomObj(object):
        def __init__(self, numpy_obj):
                self.numpy_obj = numpy_obj
                self.nested_obj = NestedObj()

        def set_value(self, p, q, v):
                self.numpy_obj[p, q] = v

        def get_obj(self):
                return self.numpy_obj

        def get_nested_obj(self):
                return self.nested_obj.val

class CustomProcess(Process):
        def __init__(self, obj, p, q, v):
                super(CustomProcess, self).__init__()
                self.obj = obj
                self.index = p, q
                self.v = v

        def run(self):
                self.obj.set_value(*self.index, self.v)



if __name__=="__main__":
        BaseManager.register('CustomObj', CustomObj)
        manager = BaseManager()
        manager.start()
        data = [[0 for x in range(10)] for y in range(10)]
        matrix = np.matrix(data)
        custom_obj = manager.CustomObj(matrix)
        print(custom_obj.get_obj())
        process_list = []
        for p in range(10):
                for q in range(10):
                        proc = CustomProcess(custom_obj, p, q, 10*p+q)
                        process_list.append(proc)
        for x in range(100):
                process_list[x].start()
        for x in range(100):
                process_list[x].join()
        print(custom_obj.get_obj())
        print(custom_obj.get_nested_obj())
0 голосов
/ 22 марта 2019

В Python 3.6 документы говорят:

Изменено в версии 3.6: общие объекты могут быть вложенными.Например, общий контейнерный объект, такой как общий список, может содержать другие общие объекты, которые все будут управляться и синхронизироваться SyncManager.

Пока экземпляры создаются с помощью SyncManager, вам следуетвозможность сделать объекты ссылаться друг на друга.Динамическое создание объекта одного типа в методах объекта другого типа все еще может быть невозможным или очень сложным.

Редактировать: я наткнулся на эту проблему Многопроцессорные менеджеры и пользовательские классы с python3.6.5 и 3.6.7.Необходимо проверить Python 3.7

Редактировать 2: Из-за некоторых других проблем я не могу в настоящее время проверить это с python3.7.Обходной путь, представленный в https://stackoverflow.com/a/50878600/7541006, отлично работает для меня

0 голосов
/ 11 июня 2017

Чтобы сохранить некоторые головные боли с общими ресурсами, вы можете попытаться собрать данные, которым требуется доступ к одноэлементному ресурсу, в операторе возврата функции, которая отображается, например, в. pool.imap_unordered и последующая обработка в цикле, который извлекает частичные результаты:

for result in in pool.imap_unordered(process_function, iterable_data):
    do_something(result)

Если будет возвращено не так много данных, то при этом может не потребоваться много времени.

...