Отказ от ответственности: я автор вопроса.
В конечном итоге я использовал модуль posix_ipc , чтобы создать свою собственную версию RawArray ,Я использовал в основном posix_ipc.SharedMemory
, который вызывает shm_open()
под капотом.
Моя реализация (ShmemRawArray
) предоставляет ту же функциональность, что и RawArray
, но требует двух дополнительных параметров - a tag
для уникальной идентификации области совместно используемой памяти и флаг create
, чтобы определить, следует ли нам создать новый сегмент совместно используемой памяти или присоединить к существующему.
Вот копия, если кому-то интересно: https://gist.github.com/1222327
ShmemRawArray(typecode_or_type, size_or_initializer, tag, create=True)
Замечания по использованию:
- Первые два аргумента (
typecode_or_type
и size_or_initializer
) должны работать так же, как с RawArray
. - Общий массив доступен любому процессу, если соответствует
tag
. - Сегмент разделяемой памяти не связывается при удалении исходного объекта (возвращаемого
ShmemRawArray(..., create=True)
) - Созданиесовместно используемый массив, использующий
tag
, который существует в настоящее время, вызовет ExistentialError
- . При доступе к совместно используемому массиву, используя
tag
, который не существует (или тот, который не был связан), также вызовет ExistentialError
A SSCCE (короткий, автономный, компилируемый пример), показывающий его в действии.
#!/usr/bin/env python2.7
import ctypes
import multiprocessing
from random import random, randint
from shmemctypes import ShmemRawArray
class Point(ctypes.Structure):
_fields_ = [ ("x", ctypes.c_double), ("y", ctypes.c_double) ]
def worker(q):
# get access to ctypes array shared by parent
count, tag = q.get()
shared_data = ShmemRawArray(Point, count, tag, False)
proc_name = multiprocessing.current_process().name
print proc_name, ["%.3f %.3f" % (d.x, d.y) for d in shared_data]
if __name__ == '__main__':
procs = []
np = multiprocessing.cpu_count()
queue = multiprocessing.Queue()
# spawn child processes
for i in xrange(np):
p = multiprocessing.Process(target=worker, args=(queue,))
procs.append(p)
p.start()
# create a unique tag for shmem segment
tag = "stack-overflow-%d" % multiprocessing.current_process().pid
# random number of points with random data
count = randint(3,10)
combined_data = [Point(x=random(), y=random()) for i in xrange(count)]
# create ctypes array in shared memory using ShmemRawArray
# - we won't be able to use multiprocssing.sharectypes.RawArray here
# because children already spawned
shared_data = ShmemRawArray(Point, combined_data, tag)
# give children info needed to access ctypes array
for p in procs:
queue.put((count, tag))
print "Parent", ["%.3f %.3f" % (d.x, d.y) for d in shared_data]
for p in procs:
p.join()
Запуск этого приводит к следующему выводу:
[me@home]$ ./shmem_test.py
Parent ['0.633 0.296', '0.559 0.008', '0.814 0.752', '0.842 0.110']
Process-1 ['0.633 0.296', '0.559 0.008', '0.814 0.752', '0.842 0.110']
Process-2 ['0.633 0.296', '0.559 0.008', '0.814 0.752', '0.842 0.110']
Process-3 ['0.633 0.296', '0.559 0.008', '0.814 0.752', '0.842 0.110']
Process-4 ['0.633 0.296', '0.559 0.008', '0.814 0.752', '0.842 0.110']