Как использовать разделяемую память вместо передачи объектов путем травления между несколькими процессами - PullRequest
0 голосов
/ 04 декабря 2018

Я работаю над интенсивной проблемой ML, которая связана с аддитивной моделью.Поскольку сложение является основной операцией, я могу разделить входные данные на части и создать несколько моделей, которые затем объединяются переопределенным методом __add__.

Код, относящийся к многопроцессорной обработке, выглядит следующим образом:

def pool_worker(filename, doshuffle):
    print(f"Processing file: {filename}")
    with open(filename, 'r') as f:
        partial = FragmentModel(order=args.order, indata=f, shuffle=doshuffle)
        return partial

def generateModel(is_mock=False, save=True):
    model = None
    with ThreadPool(args.nthreads) as pool:
        from functools import partial
        partial_models = pool.imap_unordered(partial(pool_worker, doshuffle=is_mock), args.input)
        i = 0
        for m in partial_models:
            logger.info(f'Starting to merge model {i}')
            if model is None:
                import copy
                model = copy.deepcopy(m)
            else:
                model += m
            logger.info(f'Done merging...')
            i += 1

    return model

Проблема заключается в том, что потребление памяти масштабируется экспоненциально по мере увеличения порядка модели, поэтому в порядке 4 каждый экземпляр модели составляет около 4-5 ГБ, что приводит к сбою пула потоков, поскольку промежуточные объекты модели неpickleable.

Я читал об этом немного, и кажется, что даже если травление не является проблемой, все равно крайне неэффективно передавать такие данные, как прокомментировал этот ответ .

Однако очень мало указаний относительно того, как можно использовать разделяемую память для этой цели.Можно ли избежать этой проблемы, не меняя внутреннюю часть модельного объекта?

Ответы [ 4 ]

0 голосов
/ 27 апреля 2019

Начиная с Python 3.8 (сейчас альфа), будет multiprocessing.shared_memory, что позволит осуществлять прямой обмен данными между процессами в режиме чтения / записи, аналогично «реальной» многопоточности в других языках (C, Java),

Это будет и быстрее, и проще в использовании, чем обмен данными через диск или сокет или другие коммуникации, требующие сериализации / десериализации и копирования данных.

Пример:

>>> import numpy as np
>>> import multiprocessing as mp
>>> a = np.array([1, 1, 2, 3, 5, 8])  # numpy array on private memory
>>> shm = mp.shared_memory.SharedMemory(create=True, size=a.nbytes)  # allocate shared memory
>>> b = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf)  # numpy array on shared memory
>>> b[:] = a[:]  # copy data into shared memory
>>> type(b)
<class 'numpy.ndarray'>
>>> b
array([1, 1, 2, 3, 5, 8])
0 голосов
/ 07 января 2019

Ознакомьтесь с проектом ray , который представляет собой инфраструктуру распределенного выполнения, в которой для сериализации используется apache arrow .Это особенно замечательно, если вы работаете с массивными массивами и, следовательно, является отличным инструментом для рабочих процессов ML.

Вот фрагмент из документов по сериализации объектов

В Ray мы оптимизируем массивные массивы, используя формат данных Apache Arrow.Когда мы десериализуем список массивов numpy из хранилища объектов, мы все равно создаем список объектов numpy массива в Python.Однако вместо того, чтобы копировать каждый пустой массив, каждый объект массив содержит указатель на соответствующий массив, хранящийся в общей памяти.У этой формы сериализации есть некоторые преимущества.

  • Десериализация может быть очень быстрой.
  • Память распределяется между процессами, поэтому все рабочие процессы могут считывать одни и те же данные без необходимости их копировать.

По моему мнению, это даже проще, чем многопроцессорная библиотекадля параллельного выполнения, особенно если вы хотите использовать разделяемую память, ознакомьтесь с руководством по .

0 голосов
/ 09 января 2019

Используйте файлы!

Нет, действительно, используйте файлы - они эффективны (ОС будет кэшировать содержимое) и позволят вам работать над гораздо большими проблемами (набор данных)не обязательно помещается в ОЗУ).

Используйте любой из https://docs.scipy.org/doc/numpy-1.15.0/reference/routines.io.html для выгрузки / загрузки массивов в / из файлов и передачи имен файлов между процессами.

Методы сериализации PS, в зависимости от размера промежуточного массива, самые быстрые могут быть «сырыми» (без затрат на преобразование) или «сжатыми» (если файл заканчивается записью на диск) или чем-то еще.IIRC для загрузки «сырых» файлов может потребоваться заранее знать формат данных (размеры, размеры).

0 голосов
/ 04 января 2019

Вы должны использовать прокси-объект Manager для общих редактируемых объектов: https://docs.python.org/3/library/multiprocessing.html#multiprocessing-managers Блокировка доступа будет обрабатываться этим прокси-объектом Manager.

В Настраиваемые менеджеры раздел имеетсяпример, который вам подходит:

from multiprocessing.managers import BaseManager

class MathsClass:
    def add(self, x, y):
        return x + y
    def mul(self, x, y):
        return x * y

class MyManager(BaseManager):
    pass

MyManager.register('Maths', MathsClass)

if __name__ == '__main__':
    with MyManager() as manager:
        maths = manager.Maths()
        print(maths.add(4, 3))         # prints 7
        print(maths.mul(7, 8))         # prints 56

После этого вам нужно подключиться к этому менеджеру из разных процессов (как показано в с использованием удаленного менеджера ) и отредактировать его, как вы хотите.

...