У меня есть класс Processor
, который принимает некоторые входные данные (которые мы будем называть примерами), обрабатывает входные данные и выводит результаты. На высоком уровне это выглядит так:
from tqdm import tqdm
import multiprocessing
class Processor:
def __init__(self, arg1, arg2, model_path):
self.arg1 = arg1
self.arg2 = arg2
# load model from very large file that will take some time
self.model = load_model_from_path(model_path)
def process_all_examples(self, all_examples):
all_results = []
pool = multiprocessing.Pool(4)
for result in tqdm(pool.imap_unordered(self.process_single_example, all_examples), total=len(all_examples)):
all_results.append(result)
return all_results
def process_single_example(self, example):
# do some complicated calculations on the example that use
# self.arg1, self.arg2, and self.model
return result
Идея состоит в том, что процессор инициализируется один раз (загрузка модели занимает много времени) и может использовать преимущества многоядерной машины для обработки ввода Примеры. Вышеупомянутое не работает, так как методы класса не могут использоваться для многопроцессорной обработки После просмотра следующих сообщений StackOverflow:
вызов многопроцессорной обработки в методе класса Python
Многопроцессорная обработка: как использовать Pool.map для функции, определенной в класс?
Многопроцессорная обработка: как использовать Pool.map для функции, определенной в классе?
Я пришел к следующему решению:
from tqdm import tqdm
import multiprocessing
class Processor:
def __init__(self, arg1, arg2, model_path):
self.arg1 = arg1
self.arg2 = arg2
# load model from very large file that will take some time
self.model = load_model_from_path(model_path)
def process_all_examples(self, all_examples):
all_results = []
all_inputs = [(self, example) for example in all_examples]
pool = multiprocessing.Pool(4)
for result in tqdm(pool.imap_unordered(self.process_single_example, all_inputs), total=len(all_inputs)):
all_results.append(result)
return all_results
@staticmethod
def process_single_example(inputs):
self, example = inputs
# do some complicated calculations on the example that use
# self.arg1, self.arg2, and self.model
return result
Однако это не сработало. Если я попытаюсь запустить process_all_examples
, он застрянет на .imap_unordered
. В целях тестирования я попытался использовать некоторые фиктивные данные / обработку, чтобы понять, что происходит, но вместо того, чтобы застрять, многопроцессорная обработка была очень медленной:
from tqdm import tqdm
import multiprocessing
class Processor:
def __init__(self, arg1, arg2):
self.arg1 = arg1
self.arg2 = arg2
# load model from very large file that will take some time
self.model = [i for i in range(1000)]
def process_all_examples_multi(self, all_examples, nproc=4):
all_results = []
all_inputs = [(self, example) for example in all_examples]
pool = multiprocessing.Pool(nproc)
for result in tqdm(pool.imap_unordered(self.process_single_example, all_inputs), total=len(all_inputs)):
all_results.append(result)
return all_results
def process_all_examples_single(self, all_examples):
all_results = []
all_inputs = [(self, example) for example in all_examples]
for _input in tqdm(all_inputs):
all_results.append(self.process_single_example(_input))
return all_results
@staticmethod
def process_single_example(inputs):
self, example = inputs
result = self.arg1 * self.arg2 * self.model[3] * example
return result
processor = Processor(-1, 2)
all_examples = list(range(100000))
results = processor.process_all_examples_multi(all_examples) # slower
results = processor.process_all_examples_single(all_examples) # faster
Добавление параметра chunksize
(со значением от 100 до 10000) до .imap_unordered
, по-видимому, значительно повышает производительность, но никогда не превосходит простое использование одного ядра без multiprocessin.Pool
.
Я знаю, что есть альтернативы, одна из которых - редизайн То, как мой код структурирован, другой использует глобальные переменные, но я не могу избавиться от ощущения, что мне здесь чего-то не хватает. Я также пытался использовать модуль pathos.multiprocessing
из библиотеки pathos
, но безрезультатно.