Во-первых, я собираюсь упростить ваш код. Поскольку методы в вашем классе никогда не используют переменные класса, я пропущу подход к классу и буду использовать только методы.
Отправной точкой является пример из документации для многопроцессорная обработка . Чтобы увидеть выгоду от использования Pool
, я добавляю две секунды сна и печатаю метку времени.
import datetime
from multiprocessing import Pool
import time
def fx(x):
time.sleep(2)
print(datetime.datetime.utcnow())
return x*x
if __name__ == '__main__':
with Pool() as p:
print(p.map(fx, range(10)))
Вывод соответствует ожидаемому
2019-11-10 11:10:05.346985
2019-11-10 11:10:05.363975
2019-11-10 11:10:05.418941
2019-11-10 11:10:05.435931
2019-11-10 11:10:07.347753
2019-11-10 11:10:07.364741
2019-11-10 11:10:07.419707
2019-11-10 11:10:07.436697
2019-11-10 11:10:09.348518
2019-11-10 11:10:09.365508
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Поскольку я не указалколичество ядер, все доступные ядра используются (на моей машине 4). Это видно по меткам времени: 4 метки времени находятся рядом друг с другом. Затем исключение приостанавливается до тех пор, пока ядра не будут снова освобождены.
Вы хотите использовать метод matthews_corrcoef
, который принимает в соответствии с документацией два аргумента y_true
и y_pred
.
Прежде чем использовать этот метод, давайте изменим тестовый метод сверху, чтобы принять два аргумента:
def fxy(x, y):
time.sleep(2)
print(datetime.datetime.utcnow())
return x*y
Из документации для multiprocessing.pool.Pool мы узнаем, что map
принимает только один аргументПоэтому я собираюсь использовать apply_async
вместо этого. Поскольку apply_async
возвращает объекты результата вместо возвращаемых значений метода, я использую список для хранения результатов и получения возвращаемых значений в отдельном цикле, подобном следующему:
if __name__ == '__main__':
with Pool() as p:
res = []
for i in range(10):
res.append(p.apply_async(fxy, args = (i, i)))
for item in res:
print(item.get())
Это дает аналогичный выводк первому подходу:
2019-11-10 11:41:24.987093
0
2019-11-10 11:41:24.996087
1
2019-11-10 11:41:25.008079
2019-11-10 11:41:25.002083
4
9
2019-11-10 11:41:26.988859
16
2019-11-10 11:41:27.009847
2019-11-10 11:41:27.009847
25
36
2019-11-10 11:41:27.011845
49
2019-11-10 11:41:28.989623
64
2019-11-10 11:41:29.019606
81
Теперь для matthews_corrcoef
. Для лучшей проверки результатов (ваши ошибки выброса pred
и man
при применении к matthews_corrcoef
) я использую номенклатуру и значения, как в примере в документации matthews_corrcoef .
import datetime
from multiprocessing import Pool
import numpy as np
from sklearn.metrics import matthews_corrcoef
def mcc_score(y_true, y_pred):
print(datetime.datetime.utcnow())
return matthews_corrcoef(y_true, y_pred)
y_true = [+1, +1, +1, -1]
y_pred = [+1, -1, +1, +1]
if __name__ == '__main__':
with Pool() as p:
res = []
for i in range(10):
res.append(p.apply_async(mcc_score, args = (y_true, y_pred)))
for item in res:
print(item.get())
Результаты ожидаемые:
2019-11-10 11:49:07.309389
2019-11-10 11:49:07.345366
2019-11-10 11:49:07.375348
2019-11-10 11:49:07.393336
2019-11-10 11:49:07.412325
2019-11-10 11:49:07.412325
2019-11-10 11:49:07.412325
-0.3333333333333333
-0.3333333333333333
-0.3333333333333333
-0.3333333333333333
-0.3333333333333333
-0.3333333333333333
-0.3333333333333333
2019-11-10 11:49:07.420319
2019-11-10 11:49:07.420319
2019-11-10 11:49:07.413325
-0.3333333333333333
-0.3333333333333333
-0.3333333333333333