Хитрость в том, чтобы разбить ваш фрейм данных на куски.map
ожидает список объектов, которые будут обработаны model.predict
.Вот полный рабочий пример с явно издевательской моделью:
import numpy as np
import pandas as pd
from multiprocessing import Pool
no_cores = 4
large_df = pd.concat([pd.Series(np.random.rand(1111)), pd.Series(np.random.rand(1111))], axis = 1)
chunk_size = len(large_df) // no_cores + no_cores
chunks = [df_chunk for g, df_chunk in large_df.groupby(np.arange(len(large_df)) // chunk_size)]
class model(object):
@staticmethod
def predict(df):
return np.random.randint(0,2)
def perform_model_predictions(model, dataFrame, cores):
try:
with Pool(processes=cores) as pool:
result = pool.map(model.predict, dataFrame)
return result
# return model.predict(dataFrame)
except AttributeError:
logging.error("AttributeError occurred", exc_info=True)
perform_model_predictions(model, chunks, no_cores)
Помните, что количество блоков здесь выбрано так, чтобы оно соответствовало количеству ядер (или просто любому числу, которое вы хотите выделить).Таким образом, каждое ядро получает справедливую долю и multiprocessing
не тратит много времени на сериализацию объектов.
Если вы хотите обрабатывать каждую строку отдельно (pd.Series
), время, потраченное на сериализацию, может бытьбеспокойство.В таком случае я бы порекомендовал использовать joblib
и читать документы на разных серверах.Я не писал на нем, так как казалось, что вы хотите вызвать прогнозирование на pd.Dataframe
.
Дополнительное предупреждение
Может случиться так, что multiprocessing
вместо полученияВы лучше производительность, сделает ее хуже.Это происходит в довольно редких ситуациях, когда ваш model.predict
вызывает внешние модули, которые сами порождают потоки.Я писал о проблеме здесь .Короче говоря, joblib
снова может быть ответом.