многопроцессорная обработка большого набора данных через функцию сложного класса с итератором - PullRequest
0 голосов
/ 23 октября 2019

Я искал, вероятно, 10 потоков при многопроцессорном поиске, но, похоже, ничего не подходит для моего варианта использования. Вот общее представление о том, что я хочу распараллелить.

class foo():
    def boo():
        filename = 'path to the data file'
        with reader(filename) as fileReader:
            for id, feature in fileReader:
                 boo2(id, feature)
    def boo2(id, feature):
        *process feature then save the output to a folder*

Здесь я хочу распараллелить вызов к boo2(), где fileReader - итератор ( sequentialMatrixReader из pykaldi) с десятками тысяч строк id и feature, где id - это строка, а каждая feature - это матрица (сотни строк x десятки столбцов). boo2 вычислит матрицу меньшего размера и сохранит результат в папке на основе id. Каждый вызов boo2 не зависит друг от друга, поэтому я хочу распараллелить его.

Насколько я понимаю, я не могу использовать multiprocessing.Pool, поскольку boo2 - это функция класса, и я не могу вытащить ее из класса из-за ее сложности.

Я не знаю, как использовать multiprocessing.Process, так как количество ядер намного меньше, чем число строк итератора, и я не уверен, как ставить новые вызовы на boo2, как только яstart() и join() процессов (я пытался разбить fileReader на n пакетов и установить процесс на пакет, однако я бы предпочел ставить вызовы в одну строку по сравнению с несколькими пакетами)

Я также изучил модуль pathos, поскольку у него нет проблем с функциями класса. Однако из примеров использования наиболее подходящим для меня является:

pathos.threading.ThreadPoolpool.imap(boo2, [feature for feature in fileReader])

Но из-за того, что у меня fileReader, я не могу разместить [feature for feature in fileReader] в памяти.

Любой и всепомощь приветствуется. Спасибо.

1 Ответ

1 голос
/ 23 октября 2019

Вы не сможете использовать multiprocessing из-за членов класса, вам нужна отдельная функция для этого - вы правы в этом.

Что касается использования потоков, я быпредлагаем вам не использовать простое понимание [feature for feature in fileReader], а читать функции из fileReader в пакетах в соответствии с имеющимися у вас потоками ЦП, затем запускать потоки, ждать завершения и затем читать следующий пакет и т. д.

Что-то вроде:

def make_next_batch( fileReader ) :
    batch = []
    for feature in fileReader :
        if len(batch) == BATCH_SIZE :
            yield batch
            batch = []
        batch.append( feature )
    if len(batch) :
        yield batch

Тогда вам нужно одновременно хранить только BATCH_SIZE функций в памяти.

...