Предоставление доступа к общей памяти после того, как дочерние процессы уже запущены - PullRequest
11 голосов
/ 14 сентября 2011

Как предоставить дочерним процессам доступ к данным в общей памяти, если данные доступны только после того, как дочерние процессы были созданы (используя multiprocessing.Process )?

Мне известно о multiprocessing.sharedctypes.RawArray , но я не могу понять, как предоставить моим дочерним процессам доступ к RawArray, который создается после того, как процессы уже запущены.

Данные генерируются родительским процессом, а объем данных заранее неизвестен.

Если бы не GIL , я бы вместо этого использовал многопоточность, которая немного упростит эту задачу. Использование не-CPython реализации не вариант.


Если заглянуть под капот muliprocessing.sharedctypes , похоже, что общие объекты ctype выделяются с использованием mmap ed памяти .

Таким образом, этот вопрос действительно сводится к следующему: Может ли дочерний процесс получить доступ к анонимно отображенной памяти, если родительский файл после вызова дочернего процесса вызвал mmap()?

Это несколько в духе того, что задают в этом вопросе , за исключением того, что в моем случае вызывающий mmap() является родительским процессом, а не дочерним процессом.


(Раскрыты)

Я создал свою собственную версию RawArray, которая использует shm_open() под капотом. Полученный в результате общий массив ctypes может использоваться совместно любым процессом, если совпадает идентификатор (tag).

См. этот ответ для деталей и примера.

Ответы [ 3 ]

6 голосов
/ 16 сентября 2011

Ваша проблема звучит как идеально подходящая для posix_ipc или sysv_ipc модулей , которые предоставляют API POSIX или SysV для совместной памяти, семафоров и очередей сообщений.Матрица функций содержит отличные рекомендации по выбору модулей, которые он предоставляет.

Проблема с анонимными mmap(2) областями заключается в том, что вы не можете легко поделиться ими с другими процессами - если они были заархивированы, это 'Было бы легко, но если вам на самом деле не нужен файл для чего-то еще, это кажется глупым.Вы могли бы использовать флаг CLONE_VM для системного вызова clone(2), если бы это было в C, но я бы не хотел пытаться использовать его с интерпретатором языка, который, вероятно, делает предположения о безопасности памяти.(Это было бы немного опасно даже в C, поскольку программисты по техническому обслуживанию через пять лет также могут быть шокированы поведением CLONE_VM.)

Но SysV и более новая POSIX поделилисьсопоставления памяти позволяют даже несвязанным процессам присоединяться и отсоединяться от разделяемой памяти по идентификатору, поэтому все, что вам нужно сделать, это поделиться идентификатором из процессов, которые создают сопоставления, с процессами, которые используют сопоставления, а затем при манипулировании данными в сопоставленияхони доступны всем процессам одновременно без каких-либо дополнительных затрат на анализ.Функция shm_open(3) возвращает int, который используется в качестве дескриптора файла при последующих вызовах ftruncate(2), а затем mmap(2), поэтому другие процессы могут использовать сегмент общей памяти без создания файла в файловой системе - иэта память будет сохраняться, даже если все процессы, использующие ее, завершены.(Возможно, немного странно для Unix, но он гибкий.)

6 голосов
/ 16 сентября 2011

Отказ от ответственности: я автор вопроса.

В конечном итоге я использовал модуль posix_ipc , чтобы создать свою собственную версию RawArray ,Я использовал в основном posix_ipc.SharedMemory, который вызывает shm_open() под капотом.

Моя реализация (ShmemRawArray) предоставляет ту же функциональность, что и RawArray, но требует двух дополнительных параметров - a tag для уникальной идентификации области совместно используемой памяти и флаг create, чтобы определить, следует ли нам создать новый сегмент совместно используемой памяти или присоединить к существующему.

Вот копия, если кому-то интересно: https://gist.github.com/1222327

ShmemRawArray(typecode_or_type, size_or_initializer, tag, create=True)

Замечания по использованию:

  • Первые два аргумента (typecode_or_type и size_or_initializer) должны работать так же, как с RawArray.
  • Общий массив доступен любому процессу, если соответствует tag.
  • Сегмент разделяемой памяти не связывается при удалении исходного объекта (возвращаемого ShmemRawArray(..., create=True))
  • Созданиесовместно используемый массив, использующий tag, который существует в настоящее время, вызовет ExistentialError
  • . При доступе к совместно используемому массиву, используя tag, который не существует (или тот, который не был связан), также вызовет ExistentialError

A SSCCE (короткий, автономный, компилируемый пример), показывающий его в действии.

#!/usr/bin/env python2.7
import ctypes
import multiprocessing
from random import random, randint
from shmemctypes import ShmemRawArray

class Point(ctypes.Structure):
    _fields_ = [ ("x", ctypes.c_double), ("y", ctypes.c_double) ]

def worker(q):
    # get access to ctypes array shared by parent
    count, tag = q.get()
    shared_data = ShmemRawArray(Point, count, tag, False)

    proc_name = multiprocessing.current_process().name
    print proc_name, ["%.3f %.3f" % (d.x, d.y) for d in shared_data]

if __name__ == '__main__':
    procs = []
    np = multiprocessing.cpu_count()
    queue = multiprocessing.Queue()

    # spawn child processes
    for i in xrange(np):
        p = multiprocessing.Process(target=worker, args=(queue,))
        procs.append(p)
        p.start()

    # create a unique tag for shmem segment
    tag = "stack-overflow-%d" % multiprocessing.current_process().pid

    # random number of points with random data
    count = randint(3,10) 
    combined_data = [Point(x=random(), y=random()) for i in xrange(count)]

    # create ctypes array in shared memory using ShmemRawArray
    # - we won't be able to use multiprocssing.sharectypes.RawArray here 
    #   because children already spawned
    shared_data = ShmemRawArray(Point, combined_data, tag)

    # give children info needed to access ctypes array
    for p in procs:
        queue.put((count, tag))

    print "Parent", ["%.3f %.3f" % (d.x, d.y) for d in shared_data]
    for p in procs:
        p.join()

Запуск этого приводит к следующему выводу:

[me@home]$ ./shmem_test.py
Parent ['0.633 0.296', '0.559 0.008', '0.814 0.752', '0.842 0.110']
Process-1 ['0.633 0.296', '0.559 0.008', '0.814 0.752', '0.842 0.110']
Process-2 ['0.633 0.296', '0.559 0.008', '0.814 0.752', '0.842 0.110']
Process-3 ['0.633 0.296', '0.559 0.008', '0.814 0.752', '0.842 0.110']
Process-4 ['0.633 0.296', '0.559 0.008', '0.814 0.752', '0.842 0.110']
0 голосов
/ 14 сентября 2011

Я думаю, что вы ищете модуль mmap

относительно сериализации данных на этот вопрос ответ, конечно, если вы хотите избежать копирования, у меня нет решения

EDIT

на самом деле вы можете использовать модуль не stdlib _mutliprocessing в CPython 3.2, чтобы получить адрес объекта mmap и использовать его с адресом from_address объекта ctypes это то, что фактически делает RawArray, на самом деле, конечно, вы не должны пытаться изменить размер объекта mmap, так как адрес mmap может измениться в этом случае

import mmap
import _multiprocessing
from ctypes import Structure,c_int

map = mmap.mmap(-1,4)
class A(Structure):
    _fields_ = [("x", c_int)]
x = _multiprocessing.address_of_buffer(map)
b=A.from_address(x[0])
b.x = 256

>>> map[0:4]
'\x00\x01\x00\x00'

чтобы раскрыть память после создания ребенка, вы должны сопоставить вашу память с реальным файлом, который вызывает

map = mmap.mmap(open("hello.txt", "r+b").fileno(),4)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...