Я выполняю анализ временных рядов симуляций. По сути, он выполняет одни и те же задачи для каждого временного шага. Поскольку существует очень большое количество временных шагов, и анализ каждого из них не зависит, я хотел создать функцию, которая может мультипроцессировать другую функцию. Последний будет иметь аргументы и возвращать результат.
Используя общий словарь и lib concurrent.futures, мне удалось написать это:
import concurrent.futures as Cfut
def multiprocess_loop_grouped(function, param_list, group_size, Nworkers, *args):
# function : function that is running in parallel
# param_list : list of items
# group_size : size of the groups
# Nworkers : number of group/items running in the same time
# **param_fixed : passing parameters
manager = mlp.Manager()
dic = manager.dict()
executor = Cfut.ProcessPoolExecutor(Nworkers)
futures = [executor.submit(function, param, dic, *args)
for param in grouper(param_list, group_size)]
Cfut.wait(futures)
return [dic[i] for i in sorted(dic.keys())]
Как правило, я могу использовать это так:
def read_file(files, dictionnary):
for file in files:
i = int(file[4:9])
#print(str(i))
if 'bz2' in file:
os.system('bunzip2 ' + file)
file = file[:-4]
dictionnary[i] = np.loadtxt(file)
os.system('bzip2 ' + file)
Map = np.array(multiprocess_loop_grouped(read_file, list_alti, Group_size, N_thread))
или как это:
def autocorr(x):
result = np.correlate(x, x, mode='full')
return result[result.size//2:]
def find_lambda_finger(indexes, dic, Deviation):
for i in indexes :
#print(str(i))
# Beach = Deviation[i,:] - np.mean(Deviation[i,:])
dic[i] = Anls.find_first_max(autocorr(Deviation[i,:]), valmax = True)
args = [Deviation]
Temp = Rescal.multiprocess_loop_grouped(find_lambda_finger, range(Nalti), Group_size, N_thread, *args)
В принципе, это работает. Но это не работает хорошо. Иногда это вылетает. Иногда он на самом деле запускает число процессов Python, равных Nworkers, а иногда только 2 или 3 из них работают одновременно, пока я указал Nworkers = 15
.
Например, классическая ошибка, которую я получаю, описана в следующей теме, которую я поднял: Вызов matplotlib ПОСЛЕ многопроцессорной обработки иногда приводит к ошибке: основной поток не в основном цикле
Какой более Pythonic способ достичь того, чего я хочу? Как я могу улучшить контроль этой функции? Как я могу больше контролировать количество запущенных процессов Python?