Как передать большие числовые массивы между подпроцессами python без сохранения на диск? - PullRequest
26 голосов
/ 17 февраля 2011

Есть ли хороший способ передать большой кусок данных между двумя подпроцессами python без использования диска? Вот мультипликационный пример того, чего я надеюсь достичь:

import sys, subprocess, numpy

cmdString = """
import sys, numpy

done = False
while not done:
    cmd = raw_input()
    if cmd == 'done':
        done = True
    elif cmd == 'data':
        ##Fake data. In real life, get data from hardware.
        data = numpy.zeros(1000000, dtype=numpy.uint8)
        data.dump('data.pkl')
        sys.stdout.write('data.pkl' + '\\n')
        sys.stdout.flush()"""

proc = subprocess.Popen( #python vs. pythonw on Windows?
    [sys.executable, '-c %s'%cmdString],
    stdin=subprocess.PIPE,
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE)

for i in range(3):
    proc.stdin.write('data\n')
    print proc.stdout.readline().rstrip()
    a = numpy.load('data.pkl')
    print a.shape

proc.stdin.write('done\n')

Это создает подпроцесс, который генерирует пустой массив и сохраняет массив на диск. Затем родительский процесс загружает массив с диска. Это работает!

Проблема в том, что наше оборудование может генерировать данные в 10 раз быстрее, чем диск может читать / записывать. Есть ли способ передачи данных из одного процесса Python в другой исключительно в памяти, может быть, даже без создания копии данных? Могу ли я сделать что-то вроде передачи по ссылке?

Моя первая попытка передачи данных исключительно в памяти довольно паршивая:

import sys, subprocess, numpy

cmdString = """
import sys, numpy

done = False
while not done:
    cmd = raw_input()
    if cmd == 'done':
        done = True
    elif cmd == 'data':
        ##Fake data. In real life, get data from hardware.
        data = numpy.zeros(1000000, dtype=numpy.uint8)
        ##Note that this is NFG if there's a '10' in the array:
        sys.stdout.write(data.tostring() + '\\n')
        sys.stdout.flush()"""

proc = subprocess.Popen( #python vs. pythonw on Windows?
    [sys.executable, '-c %s'%cmdString],
    stdin=subprocess.PIPE,
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE)

for i in range(3):
    proc.stdin.write('data\n')
    a = numpy.fromstring(proc.stdout.readline().rstrip(), dtype=numpy.uint8)
    print a.shape

proc.stdin.write('done\n')

Это очень медленно (намного медленнее, чем сохранение на диск) и очень, очень хрупкое. Должен быть лучший способ!

Я не женат на модуле 'подпроцесс', если процесс сбора данных не блокирует родительское приложение. Я кратко попробовал «многопроцессорность», но пока безуспешно.

Справочная информация: У нас есть аппаратное обеспечение, которое генерирует до ~ 2 ГБ / с данных в серии буферов ctypes. Код Python для работы с этими буферами полностью занят работой с потоком информации. Я хочу координировать этот поток информации с несколькими другими аппаратными компонентами, работающими одновременно в основной программе, без подпроцессов, блокирующих друг друга. В настоящее время мой подход заключается в том, чтобы немного сбить данные в подпроцессе перед сохранением на диск, но было бы неплохо передать всю монтировку процессу 'master'.

Ответы [ 6 ]

26 голосов
/ 18 февраля 2011

В поисках дополнительной информации о коде, опубликованном Джо Кингтоном, я обнаружил пакет numpy-sharedmem . Судя по этому учебному пособию по многопроцессорной обработке , похоже, что оно обладает одним и тем же интеллектуальным наследием (может быть, в значительной степени теми же авторами? - я не уверен).

Используя модуль sharedmem, вы можете создать пустой массив совместно используемой памяти (потрясающе!) И использовать его с многопроцессорной обработкой следующим образом:

import sharedmem as shm
import numpy as np
import multiprocessing as mp

def worker(q,arr):
    done = False
    while not done:
        cmd = q.get()
        if cmd == 'done':
            done = True
        elif cmd == 'data':
            ##Fake data. In real life, get data from hardware.
            rnd=np.random.randint(100)
            print('rnd={0}'.format(rnd))
            arr[:]=rnd
        q.task_done()

if __name__=='__main__':
    N=10
    arr=shm.zeros(N,dtype=np.uint8)
    q=mp.JoinableQueue()    
    proc = mp.Process(target=worker, args=[q,arr])
    proc.daemon=True
    proc.start()

    for i in range(3):
        q.put('data')
        # Wait for the computation to finish
        q.join()   
        print arr.shape
        print(arr)
    q.put('done')
    proc.join()

Беговые урожаи

rnd=53
(10,)
[53 53 53 53 53 53 53 53 53 53]
rnd=15
(10,)
[15 15 15 15 15 15 15 15 15 15]
rnd=87
(10,)
[87 87 87 87 87 87 87 87 87 87]
9 голосов
/ 17 февраля 2011

По сути, вы просто хотите разделить блок памяти между процессами и просмотреть его как массив numpy, верно?

В этом случае посмотрите на это (Опубликовано в numpy-обсуждение Надавом ХорешемНекоторое время назад не моя работа).Есть пара похожих реализаций (некоторые более гибкие), но все они по существу используют этот принцип.

#    "Using Python, multiprocessing and NumPy/SciPy for parallel numerical computing"
# Modified and corrected by Nadav Horesh, Mar 2010
# No rights reserved


import numpy as N
import ctypes
import multiprocessing as MP

_ctypes_to_numpy = {
    ctypes.c_char   : N.dtype(N.uint8),
    ctypes.c_wchar  : N.dtype(N.int16),
    ctypes.c_byte   : N.dtype(N.int8),
    ctypes.c_ubyte  : N.dtype(N.uint8),
    ctypes.c_short  : N.dtype(N.int16),
    ctypes.c_ushort : N.dtype(N.uint16),
    ctypes.c_int    : N.dtype(N.int32),
    ctypes.c_uint   : N.dtype(N.uint32),
    ctypes.c_long   : N.dtype(N.int64),
    ctypes.c_ulong  : N.dtype(N.uint64),
    ctypes.c_float  : N.dtype(N.float32),
    ctypes.c_double : N.dtype(N.float64)}

_numpy_to_ctypes = dict(zip(_ctypes_to_numpy.values(), _ctypes_to_numpy.keys()))


def shmem_as_ndarray(raw_array, shape=None ):

    address = raw_array._obj._wrapper.get_address()
    size = len(raw_array)
    if (shape is None) or (N.asarray(shape).prod() != size):
        shape = (size,)
    elif type(shape) is int:
        shape = (shape,)
    else:
        shape = tuple(shape)

    dtype = _ctypes_to_numpy[raw_array._obj._type_]
    class Dummy(object): pass
    d = Dummy()
    d.__array_interface__ = {
        'data' : (address, False),
        'typestr' : dtype.str,
        'descr' :   dtype.descr,
        'shape' : shape,
        'strides' : None,
        'version' : 3}
    return N.asarray(d)

def empty_shared_array(shape, dtype, lock=True):
    '''
    Generate an empty MP shared array given ndarray parameters
    '''

    if type(shape) is not int:
        shape = N.asarray(shape).prod()
    try:
        c_type = _numpy_to_ctypes[dtype]
    except KeyError:
        c_type = _numpy_to_ctypes[N.dtype(dtype)]
    return MP.Array(c_type, shape, lock=lock)

def emptylike_shared_array(ndarray, lock=True):
    'Generate a empty shared array with size and dtype of a  given array'
    return empty_shared_array(ndarray.size, ndarray.dtype, lock)
5 голосов
/ 13 марта 2013

Из других ответов кажется, что numpy-sharedmem - это путь.

Однако, если вам нужно решение на чистом Python или установка расширений, Cython и т. Д. (Большая) проблема, вы можете использовать следующий код, который является упрощенной версией кода Nadav:

import numpy, ctypes, multiprocessing

_ctypes_to_numpy = {
    ctypes.c_char   : numpy.dtype(numpy.uint8),
    ctypes.c_wchar  : numpy.dtype(numpy.int16),
    ctypes.c_byte   : numpy.dtype(numpy.int8),
    ctypes.c_ubyte  : numpy.dtype(numpy.uint8),
    ctypes.c_short  : numpy.dtype(numpy.int16),
    ctypes.c_ushort : numpy.dtype(numpy.uint16),
    ctypes.c_int    : numpy.dtype(numpy.int32),
    ctypes.c_uint   : numpy.dtype(numpy.uint32),
    ctypes.c_long   : numpy.dtype(numpy.int64),
    ctypes.c_ulong  : numpy.dtype(numpy.uint64),
    ctypes.c_float  : numpy.dtype(numpy.float32),
    ctypes.c_double : numpy.dtype(numpy.float64)}

_numpy_to_ctypes = dict(zip(_ctypes_to_numpy.values(),
                            _ctypes_to_numpy.keys()))


def shm_as_ndarray(mp_array, shape = None):
    '''Given a multiprocessing.Array, returns an ndarray pointing to
    the same data.'''

    # support SynchronizedArray:
    if not hasattr(mp_array, '_type_'):
        mp_array = mp_array.get_obj()

    dtype = _ctypes_to_numpy[mp_array._type_]
    result = numpy.frombuffer(mp_array, dtype)

    if shape is not None:
        result = result.reshape(shape)

    return numpy.asarray(result)


def ndarray_to_shm(array, lock = False):
    '''Generate an 1D multiprocessing.Array containing the data from
    the passed ndarray.  The data will be *copied* into shared
    memory.'''

    array1d = array.ravel(order = 'A')

    try:
        c_type = _numpy_to_ctypes[array1d.dtype]
    except KeyError:
        c_type = _numpy_to_ctypes[numpy.dtype(array1d.dtype)]

    result = multiprocessing.Array(c_type, array1d.size, lock = lock)
    shm_as_ndarray(result)[:] = array1d
    return result

Вы бы использовали это так:

  1. Используйте sa = ndarray_to_shm(a) для преобразования ndarray a в общую многопроцессорную обработку. Массив .
  2. Используйте multiprocessing.Process(target = somefunc, args = (sa, )start, возможно join) для вызова somefunc в отдельном процессе , передавая общий массив.
  3. В somefunc используйте a = shm_as_ndarray(sa), чтобы получить ndarray, указывающий на общие данные. (На самом деле, вы можете захотеть сделать то же самое в исходном процессе, сразу после создания sa, чтобы два ndarrays ссылались на одни и те же данные.)

AFAICS, вам не нужно устанавливать блокировку в True, поскольку shm_as_ndarray не будет использовать блокировку в любом случае. Если вам нужна блокировка, вы должны установить для блокировки значение True и вызвать / разблокировать вызов на sa.

Кроме того, если ваш массив не одномерный, вы можете передать фигуру вместе с sa (например, использовать args = (sa, a.shape)).

Преимущество этого решения в том, что ему не нужны дополнительные пакеты или модули расширения, кроме многопроцессорной обработки (которая есть в стандартной библиотеке).

3 голосов
/ 17 февраля 2011

Использовать темы.Но я предполагаю, что у вас возникнут проблемы с GIL.

Вместо этого: выберите свой яд .

Я знаю из реализаций MPI, с которыми я работаю, что они используютобщая память для связи между узлами.В этом случае вам придется кодировать собственную синхронизацию.

2 ГБ / с звучит так, как будто у вас возникнут проблемы с большинством «простых» методов, в зависимости от ваших ограничений в реальном времени и доступной основной памяти.

1 голос
/ 02 января 2018

Можно рассмотреть возможность использования ОЗУ для временного хранения файлов, которые будут разделены между процессами .ОЗУ - это место, где часть ОЗУ рассматривается как логический жесткий диск, на который можно записывать / читать файлы так же, как и на обычном диске, но со скоростью чтения / записи ОЗУ.

В этой статье описываетсяиспользование программного обеспечения ImDisk (для MS Win) для создания такого диска и получения скорости чтения / записи файлов 6–10 гигабайт в секунду: https://www.tekrevue.com/tip/create-10-gbs-ram-disk-windows/

Пример в Ubuntu: https://askubuntu.com/questions/152868/how-do-i-make-a-ram-disk#152871

Еще одно заметное преимущество заключается в том, что файлы произвольных форматов можно передавать таким способом: например, Picke, JSON, XML, CSV, HDF5 и т. Д. ...

Имейте в виду, что все, что хранится на диске RAM,протер при перезагрузке.

1 голос
/ 03 мая 2015

Использовать темы.Вероятно, у вас не будет проблем с GIL.

GIL влияет только на код Python, но не на библиотеки с поддержкой C / Fortran / Cython.Большинство простых операций и хороший кусок стека Scientific Python на основе C выпускают GIL и могут прекрасно работать на нескольких ядрах. В этом посте более подробно рассматриваются GIL и научный Python.

Редактировать

Простые способы использования потоков включают модуль threading и multiprocessing.pool.ThreadPool.

...