Многопроцессорный пул Python;дождитесь завершения итерации - PullRequest
0 голосов
/ 25 сентября 2018

У меня есть большой набор данных, по которому я хочу, чтобы мой сценарий прошел итерацию, выполнил серию операций с каждой записью, а затем упорядочил результаты для хранения на жестком диске.Поскольку наборы данных могут быть относительно большими (~ 250 ГБ), для доступности ОЗУ требуется, чтобы набор данных обрабатывался кусками (то, что я назвал dataBlock в приведенном ниже коде) из 1000 записей за раз.Я также использую класс multiprocessing.Pool для облегчения использования нескольких ядер ЦП для этой задачи.

По сути, все устроено так, что каждый блок данных передается в пул, пул выполняет необходимые вычисления над блоком данных с использованием метода imap, пул возвращает результаты вычислений и результаты дляблок данных добавляется в список.Этот список (processed_data) является желаемым конечным продуктом набора вычислений.

processed_data = []

multiprocessing.Pool(processor_cap) as pool:

    for blockIndex, block in enumerate(range(1000, height-remainder, 1000)):

        #Read-in 1000 spectra from source dataset
        dataBlock = np.asarray(raw_dset[blockIndex*1000:block][:])

        '''
        Pass data block to processor pool, which iterates through data
        block. Each spectrum is handed off to a CPU in the pool,
        which centroids it and appends the result to "processed_block".
        '''
        processed_block = pool.imap(centroid_spectrum, dataBlock)

        #Append processed spectra to processed data bin
        for idx, processed_spectrum in enumerate(processed_block):
            processed_data.append(processed_spectrum)

Что мне хотелось бы знать, так это как сделать паузу в скрипте после вызова pool.imap() дополный processed_block был возвращен без закрытия пула.В настоящее время он переходит прямо в цикл for, который следует непосредственно в приведенном выше фрагменте кода, не ожидая, когда pool.imap вернет processed_block.Я попытался вызвать pool.join() сразу после вызова pool.imap(), но он возвращает только ***AssertionError и снова продолжает цикл for под ним.Я могу в конечном итоге успешно вызвать pool.close() и pool.join() позже в скрипте, как только все блоки данных будут переданы в пул, чуть ниже конца самого внешнего цикла for выше.

Заранее спасибо за помощь!

Ответы [ 2 ]

0 голосов
/ 26 сентября 2018

Я просто изменил вызов Pool.imap() на вызов Pool.map(), и сценарий запустился так, как задумано.Смотрите мой обмен с Михаилом Бурштейном для получения дополнительной информации.

0 голосов
/ 25 сентября 2018

Трудно работать с вашим примером, не прилагая больших усилий, чтобы изменить ситуацию;но если у вас есть Iterator из вызова imap (), вы можете рассмотреть возможность преобразования элемента итератора в список до того, как достигнете цикла for:

processed_block = pool.imap(centroid_spectrum, dataBlock)
processed_block = [ x for x in processed_block ] # convert from an iterator to a list
for idx, processed_spectrum in enumerate(processed_block):

и т. д.

Это достигает того, что вы хотели?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...