Мой код реализован на Python 3.6, запущенном в Debian.
Я довольно много читал об этом, но не нашел идеального решения для моей проблемы. У меня есть две «проблемы» (одна в отношении распараллеливания, другая в отношении блокировки), которые должны быть объединены в одно решение. Также здесь я думаю, что проблема «блокировки» зависит от того, как я решаю распараллеливание.
Задача 1:
У меня есть функция funct()
, выполняющая некоторые вычисления для отдельного текстового файла для каждого вызова функции. Затем функция манипулирует двумя переменными (var1
и var2
) в зависимости от результата и выводит некоторые выходные данные в файл hdf5 (hdf5_file
).
Теперь, когда мой код протестирован, я хочу ускорить процесс, запустив эту функцию параллельно. Дело в том, что в зависимости от входного файла расчет может занять от 10 секунд до получаса.
Таким образом, простое «разветвление n процессов и ожидание, пока все не будет сделано» не является хорошим решением, потому что тогда экземпляр, который работает в течение 10 секунд, должен ждать, пока экземпляр, который выполняется полчаса, не завершится.
Так что было бы лучше определить пул из n подпроцессов («рабочих»?), Которые, как только они закончат, перейдут на следующую работу. Таким образом, n процессов будут работать постоянно, пока все задания не будут выполнены.
Задача 2:
Кроме того, мне нужно изменить две переменные var1
и var2
и записать некоторые выходные данные в файл hdf5_file
(см. Выше). Это означает, что мне нужен какой-то механизм блокировки внутри функции, который управляет манипулированием «внешними» переменными и файлом hdf5 между n процессами.
Вот некоторый псевдокод, который отражает мою проблему:
def funct(input_file):
(x,y,z) = dostuff(input_file)
waitforlock()
if(x == 1):
var1 += 1# variable defined in parent python code, could also be returned to the calling process
if(y == 5)
var2 += 1 # variable defined in parent python code, could also be returned to the calling process
addstuff("path/to/hdf5", z)
unlock()
mypool = pool(8) # get 8 "worker processes"
for f in files:
wait_for_free_worker(mypool)
run_worker(f,funct)
wait_until_all_workers_finished()