multiprocessing.Pool не работает на последнем элементе итерации - PullRequest
0 голосов
/ 04 февраля 2020

Я пытаюсь запустить функцию func, которая принимает список индексов в качестве аргумента и обрабатывает данные.

def func(rng):
    **some processing**
    write_csv_to_disk(processed_data[rng],mode="a")


import multiprocessing
pool = multiprocessing.Pool(4)
pool.map(func,list_of_lists_of_indices)
pool.close()

Функция сохраняет частичную DataFrame[indices], обработанную параллельно, в файл в режиме append. Код работает хорошо для всех подсписков list_of_lists_of_indices, кроме последнего списка. Данные по индексам в последнем списке не сохраняются в моем файле, и пул закрыт.

list_of_lists_of_indices = [[0,1,2,3,4,.....,99999],[100000,100001,100002,100003,100004,......,199999],.....,[10000000,10000001,...,100000895]]
import multiprocessing
pool = multiprocessing.Pool(4)
pool.map(func,iterable = list_of_lists_of_indices)
pool.close()

1 Ответ

3 голосов
/ 04 февраля 2020

Ну, вы не говорите, что делает write_csv_to_disk, но здесь есть несколько возможных проблем:

  1. у вас есть несколько процессов, записывающих в один и тот же файл одновременно, и это на самом деле не может go хорошо, если вы не предприняли конкретные c шаги (например, файл блокировки), чтобы избежать их перезаписи друг друга
  2. симптомы, которые вы объясняете, выглядят так, как будто вы неправильно закрываете ваши файловые объекты, полагаясь на сборщик мусора, чтобы сделать это и закрыть ваши буферы, за исключением того, что на последней итерации возможно, что, например, рабочий умирает до запуска G C, поэтому файл не закрыт, а его буфер не сброшен в диск
  3. также, в то время как результаты для Pool.map находятся в порядке (с большими затратами) нет никакой гарантии относительно того, в каком порядке они будут выполняться . Поскольку рабочие записывают на диск, нет причин заказывать их. Я даже не понимаю, почему вы используете map, вся цель карты - возвращать результаты вычислений, чего вы здесь не делаете

    Вы не должны использовать Pool.map, а вы не должно быть «сохранение в файл в режиме добавления».

Также обратите внимание, что Pool.close означает, что вы не собираетесь давать новую работу пулу, это не не ждите, когда рабочие будут сделаны . Теперь в теории это не должно иметь значения, если вы используете только методы syn c, однако в этом случае и с учетом (2) это может быть проблемой: когда родительский процесс выходит из пула, он, вероятно, получает сборщик мусора, что означает он жестко отключает работников бассейна .

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...