У меня есть большой набор данных, по которому я хочу, чтобы мой сценарий прошел итерацию, выполнил серию операций с каждой записью, а затем упорядочил результаты для хранения на жестком диске.Поскольку наборы данных могут быть относительно большими (~ 250 ГБ), для доступности ОЗУ требуется, чтобы набор данных обрабатывался кусками (то, что я назвал dataBlock в приведенном ниже коде) из 1000 записей за раз.Я также использую класс multiprocessing.Pool
для облегчения использования нескольких ядер ЦП для этой задачи.
По сути, все устроено так, что каждый блок данных передается в пул, пул выполняет необходимые вычисления над блоком данных с использованием метода imap
, пул возвращает результаты вычислений и результаты дляблок данных добавляется в список.Этот список (processed_data
) является желаемым конечным продуктом набора вычислений.
processed_data = []
multiprocessing.Pool(processor_cap) as pool:
for blockIndex, block in enumerate(range(1000, height-remainder, 1000)):
#Read-in 1000 spectra from source dataset
dataBlock = np.asarray(raw_dset[blockIndex*1000:block][:])
'''
Pass data block to processor pool, which iterates through data
block. Each spectrum is handed off to a CPU in the pool,
which centroids it and appends the result to "processed_block".
'''
processed_block = pool.imap(centroid_spectrum, dataBlock)
#Append processed spectra to processed data bin
for idx, processed_spectrum in enumerate(processed_block):
processed_data.append(processed_spectrum)
Что мне хотелось бы знать, так это как сделать паузу в скрипте после вызова pool.imap()
дополный processed_block
был возвращен без закрытия пула.В настоящее время он переходит прямо в цикл for
, который следует непосредственно в приведенном выше фрагменте кода, не ожидая, когда pool.imap
вернет processed_block
.Я попытался вызвать pool.join()
сразу после вызова pool.imap()
, но он возвращает только ***AssertionError
и снова продолжает цикл for
под ним.Я могу в конечном итоге успешно вызвать pool.close()
и pool.join()
позже в скрипте, как только все блоки данных будут переданы в пул, чуть ниже конца самого внешнего цикла for
выше.
Заранее спасибо за помощь!