NumPy против многопроцессорных и Mmap - PullRequest
20 голосов
/ 01 апреля 2012

Я использую модуль Python multiprocessing для параллельной обработки больших числовых массивов. Массивы отображаются в память с помощью numpy.load(mmap_mode='r') в главном процессе. После этого multiprocessing.Pool() разветвляется (я полагаю).

Кажется, все работает нормально, за исключением того, что я получаю строки вроде:

AttributeError("'NoneType' object has no attribute 'tell'",)
  in `<bound method memmap.__del__ of
       memmap([ 0.57735026,  0.57735026,  0.57735026,  0.        ,  0.        ,        0.        ,  0.        ,  0.        ,  0.        ,  0.        ,        0.        ,  0.        ], dtype=float32)>`
     ignored

в логах юниттеста. Тем не менее, испытания проходят нормально.

Есть идеи, что там происходит?

Использование Python 2.7.2, OS X, NumPy 1.6.1.


UPDATE:

После некоторой отладки я выследил причину до пути к коду, который использовал (небольшой кусочек) этот отображаемый в памяти массив numpy в качестве входных данных для вызова Pool.imap.

Очевидно, «проблема» в том, как multiprocessing.Pool.imap передает свой вклад новым процессам: он использует pickle. Это не работает с mmap ed numpy массивами, и что-то внутри разрывается, что приводит к ошибке.

Я нашел этот ответ Роберта Керна, который, похоже, решает ту же проблему. Он предлагает создать специальный путь кода для случая, когда вход imap поступает из массива с отображением в память: отображение памяти в том же массиве вручную в порожденном процессе.

Это было бы настолько сложно и безобразно, что я бы предпочел жить с ошибкой и дополнительными копиями памяти. Есть ли другой способ облегчить изменение существующего кода?

1 Ответ

22 голосов
/ 02 апреля 2012

Мой обычный подход (если вы можете жить с дополнительными копиями памяти) - выполнять все операции ввода-вывода в одном процессе, а затем отправлять их в пул рабочих потоков. Для загрузки фрагмента memmapped массива в память просто выполните x = np.array(data[yourslice]) (data[yourslice].copy() на самом деле не делает этого, что может привести к некоторой путанице.).

Прежде всего, давайте сгенерируем некоторые тестовые данные:

import numpy as np
np.random.random(10000).tofile('data.dat')

Вы можете воспроизвести свои ошибки примерно так:

import numpy as np
import multiprocessing

def main():
    data = np.memmap('data.dat', dtype=np.float, mode='r')
    pool = multiprocessing.Pool()
    results = pool.imap(calculation, chunks(data))
    results = np.fromiter(results, dtype=np.float)

def chunks(data, chunksize=100):
    """Overly-simple chunker..."""
    intervals = range(0, data.size, chunksize) + [None]
    for start, stop in zip(intervals[:-1], intervals[1:]):
        yield data[start:stop]

def calculation(chunk):
    """Dummy calculation."""
    return chunk.mean() - chunk.std()

if __name__ == '__main__':
    main()

И если вы просто переключитесь на уступку np.array(data[start:stop]), вы исправите проблему:

import numpy as np
import multiprocessing

def main():
    data = np.memmap('data.dat', dtype=np.float, mode='r')
    pool = multiprocessing.Pool()
    results = pool.imap(calculation, chunks(data))
    results = np.fromiter(results, dtype=np.float)

def chunks(data, chunksize=100):
    """Overly-simple chunker..."""
    intervals = range(0, data.size, chunksize) + [None]
    for start, stop in zip(intervals[:-1], intervals[1:]):
        yield np.array(data[start:stop])

def calculation(chunk):
    """Dummy calculation."""
    return chunk.mean() - chunk.std()

if __name__ == '__main__':
    main()

Конечно, это делает дополнительную копию в памяти каждого чанка.

В долгосрочной перспективе вы, вероятно, обнаружите, что проще переключиться с файлов меммапа и перейти на что-то вроде HDF. Это особенно верно, если ваши данные многомерны. (Я бы порекомендовал h5py, но pyTables было бы неплохо, если бы ваши данные были "табличными".)

Удачи, во всяком случае!

...