Медленная многопроцессорная обработка, когда родительский объект содержит большие данные - PullRequest
2 голосов
/ 18 июня 2020

Рассмотрим следующий фрагмент:

import numpy as np
import multiprocessing as mp
import time


def work_standalone(args):
    return 2


class Worker:
    def __init__(self):
        self.data = np.random.random(size=(10000, 10000))
        # leave a trace whenever init is called
        with open('rnd-%d' % np.random.randint(100), 'a') as f:
            f.write('init called\n')

    def work_internal(self, args):
        return 2

    def _run(self, target):
        with mp.Pool() as pool:
            tasks = [[idx] for idx in range(16)]
            result = pool.imap(target, tasks)
            for res in result:
                pass

    def run_internal(self):
        self._run(self.work_internal)

    def run_standalone(self):
        self._run(work_standalone)


if __name__ == '__main__':
    t1 = time.time()
    Worker().run_standalone()
    t2 = time.time()
    print(f'Standalone took {t2 - t1:.3f} seconds')

    t3 = time.time()
    Worker().run_internal()
    t4 = time.time()
    print(f'Internal took {t3 - t4:.3f} seconds')

Т.е. у нас есть объект, содержащий большую переменную, которая использует многопроцессорность для распараллеливания некоторой работы , которая не имеет ничего общего с этой большой переменной , т.е. не читает и не записывает. Расположение рабочего процесса имеет огромное влияние на время выполнения:

Standalone took 0.616 seconds
Internal took 19.917 seconds

Почему это происходит? Я совершенно потерялся. Обратите внимание, что __init__ вызывается только дважды, поэтому случайные данные не создаются для каждого нового процесса в пуле. Единственная причина, по которой я могу думать о том, почему это будет медленным, заключается в том, что данные копируются повсюду, но это не имеет смысла, так как они нигде не используются, а python должен использовать семантику копирования при записи. Также обратите внимание, что разница исчезнет, ​​если вы сделаете run_internal метод stati c.

1 Ответ

3 голосов
/ 20 июня 2020

У вас возникшая проблема из-за target, который вы вызываете из пула. Это target - функция со ссылкой на экземпляр Worker.

Теперь вы правы, что __init__() вызывается только дважды. Но помните, когда вы отправляете что-либо в процессы и из процессов, python сначала нужно будет обработать данные.

Итак, поскольку ваш target равен self.work_internal(), python должен обрабатывать экземпляр Worker() каждый раз, когда вызывается imap. Это приводит к одной проблеме: self.data копируется снова и снова.

Следующее является доказательством. Я просто добавил 1 «входной» оператор и зафиксировал время последнего вычисления времени.

import numpy as np
import multiprocessing as mp
import time


def work_standalone(args):
    return 2


class Worker:
    def __init__(self):
        self.data = np.random.random(size=(10000, 10000))
        #  leave a trace whenever init is called
        with open('rnd-%d' % np.random.randint(100), 'a') as f:
            f.write('init called\n')

    def work_internal(self, args):
        return 2

    def _run(self, target):
        with mp.Pool() as pool:
            tasks = [[idx] for idx in range(16)]
            result = pool.imap(target, tasks)
            input("Wait for analysis")
            for res in result:
                pass

    def run_internal(self):
        self._run(self.work_internal)
        #  self._run(work_standalone)

    def run_standalone(self):
        self._run(work_standalone)


def work_internal(target):
    with mp.Pool() as pool:
        tasks = [[idx] for idx in range(16)]
        result = pool.imap(target, tasks)
        for res in result:
            pass



if __name__ == '__main__':
    t1 = time.time()
    Worker().run_standalone()

    t2 = time.time()
    print(f'Standalone took {t2 - t1:.3f} seconds')

    t3 = time.time()
    Worker().run_internal()
    t4 = time.time()
    print(f'Internal took {t4 - t3:.3f} seconds')

Вы можете запустить код, когда он покажет «ждите анализа», go и проверьте использование памяти.

Примерно так

image 1

Затем во второй раз, когда вы увидите сообщение, нажмите Enter. И наблюдайте, как использование памяти снова увеличивается и уменьшается.

image 2

С другой стороны, если вы измените self._run(self.work_internal) на self._run(work_standalone), вы заметите, что скорость очень высокая, а объем памяти не увеличивается, а также затраченное время намного меньше, чем на выполнение self.work_internal.

Решение

Один из способов решения вашей проблемы - установить self.data в качестве переменной класса stati c. В обычных случаях это предотвратит повторное копирование / повторную инициализацию переменной экземплярам. Это также предотвратило возникновение проблемы.

class Worker:
    data = np.random.random(size=(10000, 10000))
    def __init__(self):
        pass
    ...
...