Как извлечь значения из функции, запущенной в параллельных процессах? - PullRequest
0 голосов
/ 13 ноября 2018

Модуль многопроцессорности довольно запутан для начинающих на python, особенно для тех, кто только что перешел с MATLAB и стал ленивым благодаря своему параллельному инструментарию вычислений. У меня есть следующая функция, выполнение которой занимает ~ 80 секунд, и я хочу сократить это время, используя многопроцессорный модуль Python.

from time import time

xmax   = 100000000

start = time()
for x in range(xmax):
    y = ((x+5)**2+x-40)
    if y <= 0xf+1:
        print('Condition met at: ', y, x)
end  = time()
tt   = end-start #total time
print('Each iteration took: ', tt/xmax)
print('Total time:          ', tt)

Это выводит как ожидалось:

Condition met at:  -15 0
Condition met at:  -3 1
Condition met at:  11 2
Each iteration took:  8.667453265190124e-07
Total time:           86.67453265190125

Поскольку любая итерация цикла не зависит от других, я попытался принять этот Серверный процесс из официальной документации для сканирования фрагментов диапазона в отдельных процессах. И, наконец, я пришел к ответу vartec на этот вопрос и смог подготовить следующий код. Я также обновил код, основываясь на ответе Darkonaut на текущий вопрос.

from time import time 
import multiprocessing as mp

def chunker (rng, t): # this functions makes t chunks out of rng
    L  = rng[1] - rng[0]
    Lr = L % t
    Lm = L // t
    h  = rng[0]-1
    chunks = []
    for i in range(0, t):
        c  = [h+1, h + Lm]
        h += Lm
        chunks.append(c)
    chunks[t-1][1] += Lr + 1
    return chunks

def worker(lock, xrange, return_dict):
    '''worker function'''
    for x in range(xrange[0], xrange[1]):
        y = ((x+5)**2+x-40)
        if y <= 0xf+1:
            print('Condition met at: ', y, x)
            return_dict['x'].append(x)
            return_dict['y'].append(y)
            with lock:                
                list_x = return_dict['x']
                list_y = return_dict['y']
                list_x.append(x)
                list_y.append(y)
                return_dict['x'] = list_x
                return_dict['y'] = list_y

if __name__ == '__main__':
    start = time()
    manager = mp.Manager()
    return_dict = manager.dict()
    lock = manager.Lock()
    return_dict['x']=manager.list()
    return_dict['y']=manager.list()
    xmax = 100000000
    nw = mp.cpu_count()
    workers = list(range(0, nw))
    chunks = chunker([0, xmax], nw)
    jobs = []
    for i in workers:
        p = mp.Process(target=worker, args=(lock, chunks[i],return_dict))
        jobs.append(p)
        p.start()

    for proc in jobs:
        proc.join()
    end = time()
    tt   = end-start #total time
    print('Each iteration took: ', tt/xmax)
    print('Total time:          ', tt)
    print(return_dict['x'])
    print(return_dict['y'])

, что значительно сокращает время выполнения до ~ 17 секунд. Но моя общая переменная не может получить никаких значений. Пожалуйста, помогите мне выяснить, какая часть кода работает неправильно.

вывод, который я получаю:

Each iteration took:  1.7742713451385497e-07
Total time:           17.742713451385498
[]
[]

от которого я ожидаю:

Each iteration took:  1.7742713451385497e-07
Total time:           17.742713451385498
[0, 1, 2]
[-15, -3, 11]

1 Ответ

0 голосов
/ 14 ноября 2018

Проблема в вашем примере заключается в том, что изменения стандартных изменяемых структур в Manager.dict распространяться не будут. Сначала я покажу вам, как это исправить с помощью менеджера, чтобы потом показать вам лучшие варианты.

multiprocessing.Manager немного тяжел, поскольку он использует отдельный процесс только для Manager, а для работы с общим объектом необходимо использовать блокировки для согласованности данных. Если вы запускаете это на одной машине, есть лучшие варианты с multiprocessing.Pool, в случае, если вам не нужно запускать настроенные Process классы, и если вам нужно, multiprocessing.Process вместе с multiprocessing.Queue будет обычным способом делать это.

Цитируемые части из многопроцессорной документы.


Менеджер

Если стандартный (не-прокси) список или объекты dict содержатся в референте, изменения в этих изменяемых значениях не будут распространяться через диспетчер, поскольку прокси-сервер не может знать, когда изменяются содержащиеся в нем значения. Однако сохранение значения в прокси-контейнере (которое вызывает setitem на прокси-объекте) распространяется через менеджер, и поэтому для эффективного изменения такого элемента можно было бы переназначить измененное значение контейнеру. прокси ...

В вашем случае это будет выглядеть так:

def worker(xrange, return_dict, lock):
    """worker function"""
    for x in range(xrange[0], xrange[1]):
        y = ((x+5)**2+x-40)
        if y <= 0xf+1:
            print('Condition met at: ', y, x)
            with lock:
                list_x = return_dict['x']
                list_y = return_dict['y']
                list_x.append(x)
                list_y.append(y)
                return_dict['x'] = list_x
                return_dict['y'] = list_y

Здесь lock будет экземпляром manager.Lock, который вы должны передать в качестве аргумента, поскольку вся (теперь) заблокированная операция сама по себе не является атомарной. ( Здесь более простой пример с Manager с использованием Lock)

Этот подход, возможно, менее удобен, чем использование вложенных прокси-объектов для большинства случаев использования, но также демонстрирует уровень контроля над синхронизацией.

Поскольку прокси-объекты Python 3.6 являются вложенными:

Изменено в версии 3.6: общие объекты могут быть вложенными. Например, общий контейнерный объект, такой как общий список, может содержать другие общие объекты, которые все будут управляться и синхронизироваться SyncManager.

Начиная с Python 3.6, вы можете заполнить manager.dict перед началом многопроцессорной обработки с manager.list в качестве значений, а затем добавить непосредственно в рабочий без переназначения.

return_dict['x'] = manager.list()
return_dict['y'] = manager.list()

EDIT:

Вот полный пример с Manager:

import time
import multiprocessing as mp
from multiprocessing import Manager, Process
from contextlib import contextmanager
# mp_util.py from first link in code-snippet for "Pool"
# section below
from mp_utils import calc_batch_sizes, build_batch_ranges

# def context_timer ... see code snippet in "Pool" section below

def worker(batch_range, return_dict, lock):
    """worker function"""
    for x in batch_range:
        y = ((x+5)**2+x-40)
        if y <= 0xf+1:
            print('Condition met at: ', y, x)
            with lock:
                return_dict['x'].append(x)
                return_dict['y'].append(y)


if __name__ == '__main__':

    N_WORKERS = mp.cpu_count()
    X_MAX = 100000000

    batch_sizes = calc_batch_sizes(X_MAX, n_workers=N_WORKERS)
    batch_ranges = build_batch_ranges(batch_sizes)
    print(batch_ranges)

    with Manager() as manager:
        lock = manager.Lock()
        return_dict = manager.dict()
        return_dict['x'] = manager.list()
        return_dict['y'] = manager.list()

        tasks = [(batch_range, return_dict, lock)
                 for batch_range in batch_ranges]

        with context_timer():

            pool = [Process(target=worker, args=args)
                    for args in tasks]

            for p in pool:
                p.start()
            for p in pool:
                p.join()

        # Create standard container with data from manager before exiting
        # the manager.
        result = {k: list(v) for k, v in return_dict.items()}

    print(result)

Бассейн

Чаще всего multiprocessing.Pool просто сделает это. У вас есть дополнительная проблема в вашем примере, так как вы хотите распределить итерации по диапазону. Ваша chunker функция не может разделить диапазон, даже если каждый процесс выполняет одну и ту же работу:

chunker((0, 21), 4)
# Out: [[0, 4], [5, 9], [10, 14], [15, 21]]  # 4, 4, 4, 6!

Для приведенного ниже кода, пожалуйста, возьмите фрагмент кода для mp_utils.py из моего ответа здесь , он обеспечивает две функции для чанковских диапазонов, насколько это возможно.

С помощью multiprocessing.Pool ваша worker функция просто должна вернуть результат, а Pool позаботится о передаче результата обратно по внутренним очередям обратно в родительский процесс. result будет списком, поэтому вам придется снова изменить порядок результатов так, как вы хотите. Ваш пример может выглядеть так:

import time
import multiprocessing as mp
from multiprocessing import Pool
from contextlib import contextmanager
from itertools import chain

from mp_utils import calc_batch_sizes, build_batch_ranges

@contextmanager
def context_timer():
    start_time = time.perf_counter()
    yield
    end_time = time.perf_counter()
    total_time   = end_time-start_time
    print(f'\nEach iteration took: {total_time / X_MAX:.4f} s')
    print(f'Total time:          {total_time:.4f} s\n')


def worker(batch_range):
    """worker function"""
    result = []
    for x in batch_range:
        y = ((x+5)**2+x-40)
        if y <= 0xf+1:
            print('Condition met at: ', y, x)
            result.append((x, y))
    return result


if __name__ == '__main__':

    N_WORKERS = mp.cpu_count()
    X_MAX = 100000000

    batch_sizes = calc_batch_sizes(X_MAX, n_workers=N_WORKERS)
    batch_ranges = build_batch_ranges(batch_sizes)
    print(batch_ranges)

    with context_timer():
        with Pool(N_WORKERS) as pool:
            results = pool.map(worker, iterable=batch_ranges)

    print(f'results: {results}')
    x, y = zip(*chain.from_iterable(results))  # filter and sort results
    print(f'results sorted: x: {x}, y: {y}')

Пример вывода:

[range(0, 12500000), range(12500000, 25000000), range(25000000, 37500000), 
range(37500000, 50000000), range(50000000, 62500000), range(62500000, 75000000), range(75000000, 87500000), range(87500000, 100000000)]
Condition met at:  -15 0
Condition met at:  -3 1
Condition met at:  11 2

Each iteration took: 0.0000 s
Total time:          8.2408 s

results: [[(0, -15), (1, -3), (2, 11)], [], [], [], [], [], [], []]
results sorted: x: (0, 1, 2), y: (-15, -3, 11)

Process finished with exit code 0

Если бы у вас было несколько аргументов для вашего worker, вы бы создали список «задач» с аргументами-кортежами и обменялись pool.map(...) с pool.starmap(...iterable=tasks). См. Документы для получения дополнительной информации об этом.


Обработка и очередь

Если по какой-то причине вы не можете использовать multiprocessing.Pool, вы должны принять позаботьтесь о межпроцессном взаимодействии (МПК) самостоятельно, передав multiprocessing.Queue в качестве аргумента для ваших рабочих функций в child- процессы и позволяя им ставить в очередь свои результаты для отправки обратно в Родитель.

Вам также нужно будет построить свою структуру, похожую на пул, чтобы вы могли перебирать ее, чтобы запускать и присоединяться к процессам, и вам нужно get() вернуть результаты из очереди. Подробнее о Queue.get использовании я написал здесь .

Решение с таким подходом может выглядеть следующим образом:

def worker(result_queue, batch_range):
    """worker function"""
    result = []
    for x in batch_range:
        y = ((x+5)**2+x-40)
        if y <= 0xf+1:
            print('Condition met at: ', y, x)
            result.append((x, y))
    result_queue.put(result)  # <--


if __name__ == '__main__':

    N_WORKERS = mp.cpu_count()
    X_MAX = 100000000

    result_queue = mp.Queue()  # <--
    batch_sizes = calc_batch_sizes(X_MAX, n_workers=N_WORKERS)
    batch_ranges = build_batch_ranges(batch_sizes)
    print(batch_ranges)

    with context_timer():

        pool = [Process(target=worker, args=(result_queue, batch_range))
                for batch_range in batch_ranges]

        for p in pool:
            p.start()

        results = [result_queue.get() for _ in batch_ranges]

        for p in pool:
            p.join()

    print(f'results: {results}')
    x, y = zip(*chain.from_iterable(results))  # filter and sort results
    print(f'results sorted: x: {x}, y: {y}')
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...