Моя глобальная цель:
Чтобы разделить один массив размером 50 ГБ numpy. Между 20 параллельными процессами, при этом одна и та же функция применяется к массиву numpy. Внутри каждого процесс Python версии 3.8.2.
Я использую следующие три подхода и терплю неудачу везде:
- multiprocessing.Manager ()
import multiprocessing
import numpy as np
if __name__ == '__main__':
manager = multiprocessing.Manager()
x_tr = manager.list( np.random.random( (100000,10000))) # create shared memory for the processes
ОШИБКА выдана: "OSError: [WinError 87]" Причина: превышено ограничение для multiprocessing.Manager (). List (). 2 ^ 30 для float32 или 2 ^ 29 для float64 Просто убедитесь, что это назначение python выполняется без обычных ошибок: x_tr = multiprocessing.Manager (). List (np.random.random ((53680, 10000)))
> 2. joblib.Parallel
import numpy as np
import lightgbm as lgb
from joblib import Parallel, delayed
# the function to generate training or valid set
def generate_train_valid_set(size):
predictors = np.random.random((size, 10000))
targets =np.ravel( np.random.random((size, 1)))
targets[targets >= 0.6] = 1
targets[targets < 0.6] = 0
return [predictors, targets]
x_tr, y_tr=generate_train_valid_set(100000) # training set
x_val, y_val=generate_train_valid_set(20000) # validation set
resulting_dict={} # the dictionary where i wirte output from parallel threads
# the function, which i want to run on parallel for different flag but on the same big shared x_tr,x_val
def function_on_big_shared_array( flag):
print ("start calc flag:" ,flag)
clf = lgb.LGBMClassifier(min_data_in_leaf=500,
max_bin=255,
n_estimators=500, min_sum_hessian_in_leaf=1, importance_type='gain',
learning_rate=0.1, bagging_fraction=0.85,
colsample_bytree=1.0, feature_fraction=0.1, lambda_l1=5.0,
lambda_l2=3.0, max_depth=155,
min_child_samples=500, min_child_weight=5.0, min_split_gain=0.1,
num_leaves=45, subsample=0.75)
clf.fit(x_tr, y_tr, eval_set=[(x_val, y_val)], early_stopping_rounds=20, verbose=False)
feature_imp = clf.feature_importances_
resulting_dict[ flag] = feature_imp
# run the code and see that require='sharedmem' freezes and doesn't calculate function_on_big_shared_array in parallel
Parallel(n_jobs=20, require='sharedmem', verbose=10)( delayed( function_on_big_shared_array)(flag) for flag in range(0,20))
NoError, просто убедитесь, что этот фрагмент вообще не работает параллельно.
> 3. Вставьте предпочитаемые = 'процессы' в joblib. Параллельно
Parallel(n_jobs=10, require='sharedmem' , prefer='processes', verbose=10)( delayed(get_feature_importances_pos_shared)(flg) for flg in opt_prm["flag_to_predict_par"])
ОШИБКА: ValueError: предпочитать == 'процессы' и требовать == 'sharedm Это противоречивые настройки