Функция, которая мультипроцессирует другую функцию - PullRequest
0 голосов
/ 29 октября 2018

Я выполняю анализ временных рядов симуляций. По сути, он выполняет одни и те же задачи для каждого временного шага. Поскольку существует очень большое количество временных шагов, и анализ каждого из них не зависит, я хотел создать функцию, которая может мультипроцессировать другую функцию. Последний будет иметь аргументы и возвращать результат.

Используя общий словарь и lib concurrent.futures, мне удалось написать это:

import concurrent.futures as Cfut
def multiprocess_loop_grouped(function, param_list, group_size, Nworkers, *args):
    # function : function that is running in parallel
    # param_list : list of items
    # group_size : size of the groups
    # Nworkers : number of group/items running in the same time
    # **param_fixed : passing parameters

    manager = mlp.Manager()
    dic = manager.dict()
    executor = Cfut.ProcessPoolExecutor(Nworkers)

    futures = [executor.submit(function, param, dic, *args)
           for param in grouper(param_list, group_size)]

    Cfut.wait(futures)
    return [dic[i] for i in sorted(dic.keys())]

Как правило, я могу использовать это так:

def read_file(files, dictionnary):
    for file in files:
        i = int(file[4:9])
        #print(str(i))
        if 'bz2' in file:
            os.system('bunzip2 ' + file)
            file = file[:-4]
        dictionnary[i] = np.loadtxt(file)
        os.system('bzip2 ' + file)

Map = np.array(multiprocess_loop_grouped(read_file, list_alti, Group_size, N_thread))

или как это:

def autocorr(x):
    result = np.correlate(x, x, mode='full')
    return result[result.size//2:]

def find_lambda_finger(indexes, dic, Deviation):
    for i in indexes :
        #print(str(i))
        # Beach = Deviation[i,:] - np.mean(Deviation[i,:])
        dic[i] = Anls.find_first_max(autocorr(Deviation[i,:]), valmax = True)

args = [Deviation]
Temp = Rescal.multiprocess_loop_grouped(find_lambda_finger, range(Nalti), Group_size, N_thread, *args)

В принципе, это работает. Но это не работает хорошо. Иногда это вылетает. Иногда он на самом деле запускает число процессов Python, равных Nworkers, а иногда только 2 или 3 из них работают одновременно, пока я указал Nworkers = 15.

Например, классическая ошибка, которую я получаю, описана в следующей теме, которую я поднял: Вызов matplotlib ПОСЛЕ многопроцессорной обработки иногда приводит к ошибке: основной поток не в основном цикле

Какой более Pythonic способ достичь того, чего я хочу? Как я могу улучшить контроль этой функции? Как я могу больше контролировать количество запущенных процессов Python?

1 Ответ

0 голосов
/ 29 октября 2018

Одной из основных концепций мультиобработки в Python является использование очередей. Он работает довольно хорошо, когда у вас есть входной список, который можно повторять и который не нужно изменять подпроцессами. Это также дает вам хороший контроль над всеми процессами, потому что вы порождаете нужное вам число, вы можете запустить их в режиме ожидания или остановить их.

Также намного проще отлаживать. Явный обмен данными обычно является подходом, который гораздо сложнее правильно настроить.

Очереди могут содержать что угодно, так как по определению они итерируемые. Таким образом, вы можете заполнить их строками filepath для чтения файлов, не повторяемыми числами для выполнения вычислений или даже изображениями для рисования.

В вашем случае макет может выглядеть так:

import multiprocessing as mp
import numpy as np
import itertools as it


def worker1(in_queue, out_queue):
    #holds when nothing is available, stops when 'STOP' is seen
    for a in iter(in_queue.get, 'STOP'):
        #do something
        out_queue.put({a: result}) #return your result linked to the input

def worker2(in_queue, out_queue):
    for a in iter(in_queue.get, 'STOP'):
        #do something differently
        out_queue.put({a: result}) //return your result linked to the input

def multiprocess_loop_grouped(function, param_list, group_size, Nworkers, *args):
    # your final result
    result = {}

    in_queue = mp.Queue()
    out_queue = mp.Queue()

    # fill your input
    for a in param_list:
        in_queue.put(a)
    # stop command at end of input
    for n in range(Nworkers):
        in_queue.put('STOP')

    # setup your worker process doing task as specified
    process = [mp.Process(target=function,
               args=(in_queue, out_queue), daemon=True) for x in range(Nworkers)]

    # run processes
    for p in process:
        p.start()

    # wait for processes to finish
    for p in process:
        p.join()

    # collect your results from the calculations
    for a in param_list:
        result.update(out_queue.get())

    return result

temp = multiprocess_loop_grouped(worker1, param_list, group_size, Nworkers, *args)
map = multiprocess_loop_grouped(worker2, param_list, group_size, Nworkers, *args)

Это можно сделать немного более динамичным, если вы боитесь, что в ваших очередях не хватит памяти. Чем вам нужно заполнить и очистить очереди во время работы процессов. Смотрите этот пример здесь .

Заключительные слова: это не более Pythonic, как вы просили. Но для новичка это легче понять; -)

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...