Быстрая многопроцессорность для небольших процессов в Python - PullRequest
0 голосов
/ 28 мая 2020

Я столкнулся со следующей проблемой: в более крупном классе python я хотел бы выполнить его часть в нескольких процессах, чтобы ускорить мою программу. Как вы видите в моем минимальном примере, у меня есть функция f (x), которую я вызываю 10 раз. Выполнение этого с помощью одного процесса занимает около 1 секунды. В многопроцессорном режиме я хотел бы приблизиться к 100 мс. Поэтому я уже пробовал методы Pool.map и Pool.imap_unordered. Как видите, они намного быстрее, но не дают желаемых результатов.

Вы можете мне помочь?

import time
from multiprocessing import Pool


class TestClass(object):

    def __init__(self):
        # Single Process
        self.result_list = []
        self.single_process()
        print(self.result_list)

        # Multiprocess ordered
        self.result_list = []
        self.multiprocessing_ordered()
        print(self.result_list)

        # Multiprocess unordered
        self.result_list = []
        self.multiprocessing_unordered()
        print(self.result_list)

    def f(self, x):
        time.sleep(0.1)
        self.result_list.append(x**2)

    def single_process(self):
        # Single process
        start_time = time.time()
        for x in range(10):
            self.f(x)
        print("Time with a single process: {0:.1f}".format((time.time() - start_time)*1e3))

    def multiprocessing_ordered(self):
        start_time = time.time()
        pool = Pool(10)
        pool.map(self.f, list(range(10)))
        pool.close()
        print("Time with multiprocessing (ordered): {0:.1f}".format((time.time() - start_time)*1e3))

    def multiprocessing_unordered(self):
        start_time = time.time()
        pool = Pool(10)
        pool.imap_unordered(self.f, list(range(10)))
        pool.close()
        print("Time with multiprocessing (unordered): {0:.1f}".format((time.time() - start_time)*1e3))


if __name__ == '__main__':

    test_object = TestClass()

Результат:

Time with a single process: 1013.7 ms
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Time with multiprocessing (ordered): 280.3 ms
[]
Time with multiprocessing (unordered): 100.7 ms
[]

1 Ответ

1 голос
/ 28 мая 2020

Процессы находятся в собственном пространстве памяти, и поэтому self.result_list не используется совместно вашим родительским и дочерним процессами.

Вы знаете, что есть несколько вариантов:

  1. Используйте каналы или очереди или любой другой te для связи между вашими подпроцессами и вашим основным процессом.

  2. Возвращает результат вашим функция и получить ее в основном процессе следующим образом:

import time
from multiprocessing import Pool


class TestClass(object):

    def __init__(self):
        # Single Process
        self.result_list = []
        self.single_process()
        print(self.result_list)

        # Multiprocess
        self.result_list = []
        self.multiprocessing()
        print(self.result_list)

    def f(self, x):
        time.sleep(0.1)
        return x**2

    def single_process(self):
        # Single process
        start_time = time.time()
        result = []
        for x in range(10):
             self.result_list.append(self.f(x))
        print("Time with a single process: {0:.1f}".format((time.time() - start_time)*1e3))

    def multiprocessing(self):
        pool = Pool(10)

        # Start calculation
        start_time = time.time()
        multiple_results = [pool.apply_async(self.f, (i,)) for i in range(10)]

        # Receive answer
        self.result_list = [res.get(timeout=1) for res in multiple_results]

        # Evaluate result
        print("Time with multiprocessing: {0:.1f}".format((time.time() - start_time)*1e3))

        pool.close()


if __name__ == '__main__':

    test_object = TestClass()

Результат:

Time with a single process: 1002.0
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Time with multiprocessing: 102.8
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Попробуйте модуль threading или ThreadPool . Для потоковой передачи используется одно и то же пространство памяти, и поэтому результаты можно разделить, просто добавив их в список. С ThreadPool:
import time
from multiprocessing.pool import ThreadPool

class TestClass(object):

    def __init__(self):
        # Single Process
        self.result_list = []
        self.single_process()
        print(self.result_list)

        # Multiprocess ordered
        self.result_list = []
        self.multiprocessing_ordered()
        print(self.result_list)

        # Multiprocess unordered
        self.result_list = []
        self.multiprocessing_unordered()
        print(self.result_list)

    def f(self, x):
        time.sleep(0.1)
        self.result_list.append(x**2)

    def single_process(self):
        # Single process
        start_time = time.time()
        for x in range(10):
            self.f(x)
        print("Time with a single process: {0:.1f}".format((time.time() - start_time)*1e3))

    def multiprocessing_ordered(self):
        start_time = time.time()
        pool = ThreadPool(10)
        pool.map(self.f, list(range(10)))
        pool.close()
        print("Time with multiprocessing (ordered): {0:.1f}".format((time.time() - start_time)*1e3))

    def multiprocessing_unordered(self):
        start_time = time.time()
        pool = ThreadPool(10)
        [_ for _ in pool.imap_unordered(self.f, list(range(10)))]
        pool.close()
        print("Time with multiprocessing (unordered): {0:.1f}".format((time.time() - start_time)*1e3))


if __name__ == '__main__':

    test_object = TestClass()

Результат с ThreadPool:

Time with a single process: 1002.0
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Time with multiprocessing (ordered): 116.1
[0, 4, 1, 25, 36, 9, 16, 49, 81, 64]
Time with multiprocessing (unordered): 109.4
[0, 1, 4, 16, 25, 36, 9, 49, 81, 64]
...