У меня есть многопроцессорный код, который выполняет следующее:
- Разбор строки огромных файлов построчно
- Добавление строк в список при условии выполнения условия
- Когда условие
False
, apply_async()
является функцией списка, в Pool()
def main_function(func_args):
[...]
# some heavy calculations
# queue is among the func_args
queue.put(result)
if __name__ == "__main__":
manager = mp.Manager()
lock = manager.Lock()
queue = manager.Queue()
pool = manager.Pool(processes=20, maxtasksperchild=20)
for line in open(infile, "r"):
if condition == True:
# fill list
elif condition == False:
pool.apply_async(target=main_function, args=func_args)
else:
# run last list before closing file
pool.apply_async(target=main_function, args=func_args)
pool.close()
pool.join()
Теперь код работает per se , но я заметил пару вещей:
- Пока все 20 первых процессов не завершены, новые 20 не появляются. Правильно ли я делаю
apply_async
? - Использование памяти довольно велико даже при небольшом подмножестве данных: генерирует ли каждый процесс «личную» копию аргументов, которая поступает в память, если я не поделюсьэто с
Manager()
?
Что мне нужно, это:
- Я бы хотел, чтобы каждый процесс запускался как можно скорее, и до 20 водин раз (20 передается из командной строки от пользователя через
argparse
)
Любой совет от более опытных людей? :)