многопроцессорная обработка внутри классов - PullRequest
0 голосов
/ 27 апреля 2020

У меня есть класс Processor, который принимает некоторые входные данные (которые мы будем называть примерами), обрабатывает входные данные и выводит результаты. На высоком уровне это выглядит так:

from tqdm import tqdm
import multiprocessing

class Processor:
    def __init__(self, arg1, arg2, model_path):
        self.arg1 = arg1
        self.arg2 = arg2
        # load model from very large file that will take some time
        self.model = load_model_from_path(model_path)

    def process_all_examples(self, all_examples):
        all_results = []
        pool = multiprocessing.Pool(4)
        for result in tqdm(pool.imap_unordered(self.process_single_example, all_examples), total=len(all_examples)):
            all_results.append(result)
        return all_results

    def process_single_example(self, example):
        # do some complicated calculations on the example that use
        # self.arg1, self.arg2, and self.model
        return result

Идея состоит в том, что процессор инициализируется один раз (загрузка модели занимает много времени) и может использовать преимущества многоядерной машины для обработки ввода Примеры. Вышеупомянутое не работает, так как методы класса не могут использоваться для многопроцессорной обработки После просмотра следующих сообщений StackOverflow:

вызов многопроцессорной обработки в методе класса Python

Многопроцессорная обработка: как использовать Pool.map для функции, определенной в класс?

Многопроцессорная обработка: как использовать Pool.map для функции, определенной в классе?

Я пришел к следующему решению:

from tqdm import tqdm
import multiprocessing

class Processor:
    def __init__(self, arg1, arg2, model_path):
        self.arg1 = arg1
        self.arg2 = arg2
        # load model from very large file that will take some time
        self.model = load_model_from_path(model_path)

    def process_all_examples(self, all_examples):
        all_results = []
        all_inputs = [(self, example) for example in all_examples]
        pool = multiprocessing.Pool(4)
        for result in tqdm(pool.imap_unordered(self.process_single_example, all_inputs), total=len(all_inputs)):
            all_results.append(result)
        return all_results

    @staticmethod
    def process_single_example(inputs):
        self, example = inputs
        # do some complicated calculations on the example that use
        # self.arg1, self.arg2, and self.model
        return result

Однако это не сработало. Если я попытаюсь запустить process_all_examples, он застрянет на .imap_unordered. В целях тестирования я попытался использовать некоторые фиктивные данные / обработку, чтобы понять, что происходит, но вместо того, чтобы застрять, многопроцессорная обработка была очень медленной:

from tqdm import tqdm
import multiprocessing

class Processor:
    def __init__(self, arg1, arg2):
        self.arg1 = arg1
        self.arg2 = arg2
        # load model from very large file that will take some time
        self.model = [i for i in range(1000)]

    def process_all_examples_multi(self, all_examples, nproc=4):
        all_results = []
        all_inputs = [(self, example) for example in all_examples]
        pool = multiprocessing.Pool(nproc)
        for result in tqdm(pool.imap_unordered(self.process_single_example, all_inputs), total=len(all_inputs)):
            all_results.append(result)
        return all_results

    def process_all_examples_single(self, all_examples):
        all_results = []
        all_inputs = [(self, example) for example in all_examples]
        for _input in tqdm(all_inputs):
            all_results.append(self.process_single_example(_input))
        return all_results

    @staticmethod
    def process_single_example(inputs):
        self, example = inputs
        result = self.arg1 * self.arg2 * self.model[3] * example
        return result
processor = Processor(-1, 2)
all_examples = list(range(100000))

results = processor.process_all_examples_multi(all_examples) # slower
results = processor.process_all_examples_single(all_examples) # faster

Добавление параметра chunksize (со значением от 100 до 10000) до .imap_unordered, по-видимому, значительно повышает производительность, но никогда не превосходит простое использование одного ядра без multiprocessin.Pool.

Я знаю, что есть альтернативы, одна из которых - редизайн То, как мой код структурирован, другой использует глобальные переменные, но я не могу избавиться от ощущения, что мне здесь чего-то не хватает. Я также пытался использовать модуль pathos.multiprocessing из библиотеки pathos, но безрезультатно.

1 Ответ

1 голос
/ 27 апреля 2020

При многопроцессорной обработке вам нужно беспокоиться о том, какая полезная нагрузка передается от родителя к потомку, и о проделанной работе. Поскольку вы используете разветвленную операционную систему, родительский и дочерний элементы совместно используют одну и ту же память в момент создания пула. Но вы на самом деле не использовали это, потому что вы передаете self и его данные (вашу модель) ребенку для обработки для каждого рабочего элемента.

Вы можете настроить глобальное состояние, о котором рабочие знают, и поместить туда данные. Все большое попадает в глобальное состояние, и единственное, что передается пулом, - это индекс текущих данных для этого работника. Добавление chunksize уменьшает накладные расходы на связь, так что это полезно добавлять, когда у вас много рабочих элементов, и все они занимают относительно одинаковое количество времени для вычисления. Индивидуальные рабочие расчеты невелики. В этом примере я увеличил объем работы, проделанной за l oop, предполагая, что ваша фактическая работа довольно большая. Но если это не так, бассейн действительно не поможет.

from tqdm import tqdm
import multiprocessing
import threading

# will hold (Processor, example set) for process_all_examples_multi
_process_this = None
_process_this_lock = threading.Lock()

class Processor:
    def __init__(self, arg1, arg2):
        self.arg1 = arg1
        self.arg2 = arg2
        # load model from very large file that will take some time
        self.model = [i for i in range(1000)]

    def process_all_examples_multi(self, all_examples, nproc=4):
        # setup memory state for processing pool
        with _process_this_lock:
            global _process_this
            _process_this = (self, all_examples)
            # context manager deletes pool when done
            with multiprocessing.Pool(nproc) as pool:
                all_results = list(tqdm(pool.imap_unordered(
                    self.process_single_example_2,range(len(all_examples)), chunksize=100), 
                    total=len(all_examples)))
            return all_results

    def process_all_examples_single(self, all_examples):
        all_results = []
        all_inputs = [(self, example) for example in all_examples]
        for _input in tqdm(all_inputs):
            all_results.append(self.process_single_example(_input))
        return all_results

    @staticmethod
    def process_single_example(inputs):
        self, example = inputs
        result = self.arg1 * self.arg2 * self.model[3] * example
        # lets simulate more work
        for i in range(10000):
            pass
        return result

    @staticmethod
    def process_single_example_2(example_index):
        processor, example = _process_this
        result = processor.arg1 * processor.arg2 * processor.model[3] * example[example_index]
        # lets simulate more work
        for i in range(10000):
            pass
        return result

processor = Processor(-1, 2)
all_examples = list(range(100000))

results = processor.process_all_examples_multi(all_examples)
# vs
results = processor.process_all_examples_single(all_examples)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...