Передача ссылок на объекты в очереди между процессами - PullRequest
0 голосов
/ 19 июня 2019

У меня есть несколько multiprocessing.Process ов, и я хотел бы, чтобы они потребляли (очередь get()) вызываемые необратимые объекты и вызывали их.Они были созданы до fork(), поэтому они не должны нуждаться в травлении.

Использование multiprocessing.Queue не работает, поскольку оно пытается засолить все:

import multiprocessing as mp

# create non-global callable to make it unpicklable
def make_callable():
    def foo():
        print("running foo")
    return foo

def bar():
    print("running bar")

def runall(q):
    while True:
        c = q.get()
        if c is None:
            break
        c()

if __name__ == '__main__':
    q = mp.Queue()
    call = make_callable()
    p = mp.Process(target=runall, args=(q,))
    p.start()
    q.put(bar)
    q.put(call)
    q.put(None)
    p.join()
running bar
Traceback (most recent call last):
  File "/usr/lib64/python3.7/multiprocessing/queues.py", line 236, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/usr/lib64/python3.7/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'make_callable.<locals>.foo'

Эквивалентом реализации будет помещение всех объектов в глобальный (или переданный) список и передача только индексов, что работает:

import multiprocessing as mp

# create non-global callable to make it unpicklable
def make_callable():
    def foo():
        print("running foo")
    return foo

def bar():
    print("running bar")

def runall(q, everything):
    while True:
        c = q.get()
        if c is None:
            break
        everything[c]()

if __name__ == '__main__':
    q = mp.Queue()
    call = make_callable()
    everything = [bar, call]
    p = mp.Process(target=runall, args=(q,everything))
    p.start()
    q.put(0)
    q.put(1)
    q.put(None)
    p.join()
running bar
running foo

Проблема в том, что, хотя я знаю, что ни один из передаваемых вызовов не будетбыть сборщиком мусора (и, следовательно, их адреса останутся действительными), у меня нет полного списка заранее.

Я также знаю, что мог бы, вероятно, использовать multiprocessing.Manager и его реализацию Queue с использованием объекта Proxy, но это кажется большой нагрузкой, особенно в реальной реализации, я бы также передавал другие извлекаемые данные.

Есть ли способ выбрать и передать только адресную ссылку на объект, общий для всехнесколько процессов?

Спасибо!

Ответы [ 2 ]

0 голосов
/ 19 июня 2019

После долгих размышлений и поисков, я думаю, у меня есть ответ, который я искал, в основном из: Получить объект по id ()? .

Я мог бы передать id() вызываемого и затем перевести его обратно в порожденный процесс:

import ctypes
a = "hello world"
print ctypes.cast(id(a), ctypes.py_object).value

Или используйте модуль gc и, пока я сохраняю ссылку на объект, он тоже должен работать:

import gc

def objects_by_id(id_):
    for obj in gc.get_objects():
        if id(obj) == id_:
            return obj
    raise Exception("No found")

Однако ни один из них не является очень чистым, и, в конце концов, возможно, стоит наложить ограничение на то, чтобы сначала были все вызываемые объекты и просто проходили индексы.

0 голосов
/ 19 июня 2019

Истинно, что целевые объекты Process должны быть доступны для выбора.

Обратите внимание, что функции (встроенные и определяемые пользователем) выбираются по «полностью квалифицированному» имени, а не по значению. Это означает, чточто выбирается только имя функции, а также имя модуля, в котором определена функция. Ни код функции, ни какой-либо из ее атрибутов функции не обрабатываются.Таким образом, определяющий модуль должен быть импортируемым в среде удаления, и модуль должен содержать именованный объект, в противном случае будет сгенерировано исключение.

Выбираемые функции и классы должны быть определены на верхнем уровне модуля.

Таким образом, в вашем случае вам необходимо приступить к прохождению вызовов верхнего уровня, но применяя дополнительные проверки / обходные путив решающей runall функции:

import multiprocessing as mp

# create non-global callable to make it unpicklable
def make_callable():
    def foo():
        print("running foo")
    return foo

def bar():
    print("running bar")

def runall(q):
    while True:
        c = q.get()
        if c is None:
            break

        res = c()
        if callable(res): res()


if __name__ == '__main__':
    q = mp.Queue()
    p = mp.Process(target=runall, args=(q,))
    p.start()

    q.put(bar)
    q.put(make_callable)
    q.put(None)

    p.join()
    q.close() 

Выход:

running bar
running foo
...