Использовать большой общий массив в параллельных вычислениях или преодолеть ограничение в 2 ^ 30 элементов в преобразовании: numpy_array = multiprocessing.Manager (). List (numpy_array) - PullRequest
0 голосов
/ 04 мая 2020

Моя глобальная цель:

Чтобы разделить один массив размером 50 ГБ numpy. Между 20 параллельными процессами, при этом одна и та же функция применяется к массиву numpy. Внутри каждого процесс Python версии 3.8.2.

Я использую следующие три подхода и терплю неудачу везде:

  1. multiprocessing.Manager ()
import multiprocessing
import numpy as np
if __name__ == '__main__':
    manager = multiprocessing.Manager()
    x_tr = manager.list( np.random.random( (100000,10000))) # create shared memory for the processes

ОШИБКА выдана: "OSError: [WinError 87]" Причина: превышено ограничение для multiprocessing.Manager (). List (). 2 ^ 30 для float32 или 2 ^ 29 для float64 Просто убедитесь, что это назначение python выполняется без обычных ошибок: x_tr = multiprocessing.Manager (). List (np.random.random ((53680, 10000)))

> 2. joblib.Parallel

import numpy as np
import lightgbm as lgb
from joblib import Parallel, delayed

# the function to generate training or valid set
def generate_train_valid_set(size):
    predictors = np.random.random((size, 10000))
    targets =np.ravel( np.random.random((size, 1)))
    targets[targets >= 0.6] = 1
    targets[targets < 0.6] = 0
    return [predictors, targets]

x_tr, y_tr=generate_train_valid_set(100000) # training set
x_val, y_val=generate_train_valid_set(20000) # validation set

resulting_dict={}  # the dictionary where i wirte output from parallel threads

# the function, which i want to run on parallel  for different flag but on the same big shared x_tr,x_val
def function_on_big_shared_array( flag):
    print ("start calc flag:" ,flag)


    clf = lgb.LGBMClassifier(min_data_in_leaf=500,
                                 max_bin=255,
                                 n_estimators=500, min_sum_hessian_in_leaf=1, importance_type='gain',
                                 learning_rate=0.1, bagging_fraction=0.85,
                                 colsample_bytree=1.0, feature_fraction=0.1, lambda_l1=5.0,
                                 lambda_l2=3.0, max_depth=155,
                                 min_child_samples=500, min_child_weight=5.0, min_split_gain=0.1,
                                 num_leaves=45, subsample=0.75)

    clf.fit(x_tr, y_tr, eval_set=[(x_val, y_val)], early_stopping_rounds=20, verbose=False)
    feature_imp = clf.feature_importances_

    resulting_dict[ flag] =   feature_imp

# run the code and see that  require='sharedmem' freezes and doesn't calculate function_on_big_shared_array  in parallel
Parallel(n_jobs=20, require='sharedmem', verbose=10)( delayed(   function_on_big_shared_array)(flag) for flag in range(0,20))

NoError, просто убедитесь, что этот фрагмент вообще не работает параллельно.

> 3. Вставьте предпочитаемые = 'процессы' в joblib. Параллельно

Parallel(n_jobs=10, require='sharedmem' , prefer='processes',  verbose=10)( delayed(get_feature_importances_pos_shared)(flg) for flg in opt_prm["flag_to_predict_par"])

ОШИБКА: ValueError: предпочитать == 'процессы' и требовать == 'sharedm Это противоречивые настройки

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...