Я довольно новичок в питоне.
Я использую модуль многопроцессорной обработки для чтения строк текста в stdin, преобразования их некоторым образом и записи в базу данных. Вот фрагмент моего кода:
batch = []
pool = multiprocessing.Pool(20)
i = 0
for i, content in enumerate(sys.stdin):
batch.append(content)
if len(batch) >= 10000:
pool.apply_async(insert, args=(batch,i+1))
batch = []
pool.apply_async(insert, args=(batch,i))
pool.close()
pool.join()
Теперь, когда все работает нормально, пока я не обработаю огромные входные файлы (сотни миллионов строк), которые я направляю в свою программу на python. В какой-то момент, когда моя база данных работает медленнее, я вижу, что память переполняется.
После некоторой игры выяснилось, что pool.apply_async и pool.map_async никогда не блокируются, поэтому очередь обрабатываемых вызовов увеличивается и увеличивается.
Какой правильный подход к моей проблеме? Я ожидаю, что параметр, который я могу установить, будет блокировать вызов pool.apply_async, как только будет достигнута определенная длина очереди. AFAIR в Java, можно предоставить ThreadPoolExecutor BlockingQueue с фиксированной длиной для этой цели.
Спасибо!