Отсутствие прироста производительности после использования многопроцессорной обработки для функции, ориентированной на очередь - PullRequest
0 голосов
/ 15 сентября 2018

Реальный код, который я хочу оптимизировать, слишком сложен, чтобы включать его здесь, поэтому вот упрощенный пример:

def enumerate_paths(n, k):
    """
    John want to go up a flight of stairs that has N steps. He can take
    up to K steps each time. This function enumerate all different ways
    he can go up this flight of stairs.
    """
    paths = []
    to_analyze = [(0,)]

    while to_analyze:
        path = to_analyze.pop()
        last_step = path[-1]

        if last_step >= n:
            # John has reach the top
            paths.append(path)
            continue

        for i in range(1, k + 1):
            # possible paths from this point
            extended_path = path + (last_step + i,)
            to_analyze.append(extended_path)

    return paths

и вывод выглядит так

>>> enumerate_paths(3, 2)
[(0, 2, 4), (0, 2, 3), (0, 1, 3), (0, 1, 2, 4), (0, 1, 2, 3)]

Результат может показаться запутанным, поэтому здесь есть объяснение. Например, (0, 1, 2, 4) означает, что Джон может посетить и поставить ногу на первый, второй и четвертый этап хронологии, и, наконец, он останавливается на шаге 4, потому что ему нужно только подняться на 3 шага.

Я пытался включить multiprocessing в этот фрагмент, но не наблюдал никакого увеличения производительности, даже немного!

import multiprocessing

def enumerate_paths_worker(n, k, queue):
    paths = []

    while not queue.empty():
        path = queue.get()
        last_step = path[-1]

        if last_step >= n:
            # John has reach the top
            paths.append(path)
            continue

        for i in range(1, k + 1):
            # possible paths from this point
            extended_path = path + (last_step + i,)
            queue.put(extended_path)

    return paths


def enumerate_paths(n, k):
    pool = multiprocessing.Pool()
    manager = multiprocessing.Manager()
    queue = manager.Queue()

    path_init = (0,)
    queue.put(path_init)
    apply_result = pool.apply_async(enumerate_paths_worker, (n, k, queue))

    return apply_result.get()

Список Python to_analysis действует так же, как очередь задач, и каждый элемент в этой очереди может обрабатываться отдельно, поэтому я думаю, что эта функция потенциально может быть оптимизирована за счет использования многопоточности / обработки. Также обратите внимание, что порядок пунктов не имеет значения. Фактически, при его оптимизации вы можете вернуть набор Python, массив Numpy или фрейм данных Pandas, если они представляют один и тот же набор путей.

Бонусный вопрос : Какую производительность я могу получить, используя такие научные пакеты, как Numpy, Pandas или Scipy, для таких задач?

1 Ответ

0 голосов
/ 15 сентября 2018

TL; DR

Если ваш реальный алгоритм не требует более дорогих вычислений, чем вы показали нам в своем примере, накладные расходы на связь для многопроцессорной обработки будут доминировать, и ваши вычисления потребуют много раздольше, чем последовательное выполнение.


Ваша попытка с apply_async фактически использует только одного работника из вашего пула, поэтому вы не видите разницы.apply_async только по замыслу кормит одного работника.Кроме того, недостаточно просто передать последовательную версию в пул, если ваши работники должны поделиться промежуточными результатами, поэтому вам придется изменить целевую функцию, чтобы включить это.

Но, как уже говорилось во введении, ваши вычисления выиграют от многопроцессорности только в том случае, если они достаточно тяжелы, чтобы окупить накладные расходы на межпроцессное взаимодействие (и создание процесса).

Мое решение нижедля общей проблемы используется JoinableQueue в сочетании со значением часового для завершения процесса, чтобы синхронизировать рабочий процесс.Я добавляю функцию busy_foo, чтобы сделать вычисления более тяжелыми, чтобы показать случай, когда многопроцессорность имеет свои преимущества.

from multiprocessing import Process
from multiprocessing import JoinableQueue as Queue
import time

SENTINEL = 'SENTINEL'

def busy_foo(x = 10e6):
    for _ in range(int(x)):
        x -= 1


def enumerate_paths(q_analyze, q_result, n, k):
    """
    John want to go up a flight of stairs that has N steps. He can take
    up to K steps each time. This function enumerate all different ways
    he can go up this flight of stairs.
    """
    for path in iter(q_analyze.get, SENTINEL):
        last_step = path[-1]

        if last_step >= n:
            busy_foo()
            # John has reach the top
            q_result.put(path)
            q_analyze.task_done()
            continue
        else:
            busy_foo()
            for i in range(1, k + 1):
                # possible paths from this point
                extended_path = path + (last_step + i,)
                q_analyze.put(extended_path)
            q_analyze.task_done()


if __name__ == '__main__':

    N_CORES = 4

    N = 6
    K = 2

    start = time.perf_counter()
    q_analyze = Queue()
    q_result = Queue()

    q_analyze.put((0,))

    pool = []
    for _ in range(N_CORES):
        pool.append(
            Process(target=enumerate_paths, args=(q_analyze, q_result, N, K))
        )

    for p in pool:
        p.start()

    q_analyze.join() # block until everything is processed

    for p in pool:
        q_analyze.put(SENTINEL)  # let the processes exit gracefully

    results = []
    while not q_result.empty():
        results.append(q_result.get())

    for p in pool:
        p.join()

    print(f'elapsed: {time.perf_counter() - start: .2f} s')

Результаты

Если яиспользуя приведенный выше код с busy_foo закомментированным, требуется N = 30, K = 2 (2178309 результатов):

  • ~ 208 с N_CORES = 4
  • 2.78s последовательный оригинал

Травление и отжим, потоки, работающие с блокировками и т. Д., Объясняют эту огромную разницу.с включенным busy_foo для обоих и N = 6, K = 2 (21 результат):

  • 6,45 с N_CORES = 4
  • 30,46 с последовательный оригинал

Здесь вычисления были достаточно тяжелыми, чтобы можно было вернуть накладные расходы.

Numpy

Numpy может многократно ускорить векторизованные операции, но вы, скорее всего, заметите снижение производительности при использовании numpy.Numpy использует непрерывные блоки памяти для своих массивов.При изменении размера массива весь массив придется перестраивать заново, в отличие от использования списков Python.

...