Вы можете использовать многопроцессорный пул для выполнения sh некоторых из этих задач, однако многопроцессорная обработка также является дорогостоящей операцией, поэтому вам необходимо проверить, действительно ли распараллеливание происходит быстрее, это зависит от типа выполняемых вами функций. и данные, например, я создаю образец df
:
import pandas as pd
import numpy as np
from random import randint
from multiprocessing import Pool, cpu_count
from timeit import timeit
def f(df: pd.DataFrame):
df['Something'] = df['Users'].apply(lambda name: len(name))
df['Other stuff'] = df['Income'].apply(lambda income: 'Senior' if income > 200 else 'Junior')
df['Some other stuff'] = df['Users'].apply(lambda name: name.count('1'))
return df
if __name__ == '__main__':
samples = 5000000
df = pd.DataFrame(
[
['user_' + str(i), randint(0, 500)] for i in range(1, samples)
], columns=['Users', 'Income']
)
Если мы синхронизируем эту версию функции f
с многопроцессорной обработкой, я получу 38.189200899999996
в моем старом ноутбуке:
parallelized = timeit("""
cores = cpu_count()
df_in_chunks = np.array_split(df, cores)
pool = Pool(cores)
result_df = pd.concat(pool.map(f, df_in_chunks))
pool.close()
pool.join()
""",
"from __main__ import pd, np, df, Pool, cpu_count, f",
number=5
)
print(parallelized)
В этом случае я получаю 25.0754394
, поэтому накладные расходы на использование многопроцессорной обработки превышают время выполнения всего объекта в одном ядре.
not_parallelized = timeit("""
result_df = f(df)
""",
"from __main__ import pd, df, f",
number=5
)
print(not_parallelized)
Однако, если мы добавим больше Сложность функции f
В том, что широковещательная рассылка df
каждому процессу обходится дешевле, чем ее запуск в одном ядре.