'dmlc :: Error' при использовании xgboost с multiprocessing.pool - PullRequest
0 голосов
/ 14 февраля 2019

Уважаемые коллеги по обработке данных.У меня проблема с объединением multiprocessing.Pool с моей собственной вложенной функцией перекрестной проверки.

В этой функции перекрестной проверки я использую внутреннее резюме для gridsearch, чтобы найти лучшие гиперпараметры, и внешнее резюме, чтобы оценитьвыбранные гиперпараметры.В приведенном ниже коде вы можете найти воспроизводимый пример, который будет работать как есть, если вы скопируете его.Я вполне уверен, что причиной проблемы является взаимодействие пакета xgboost с пакетом multiprocessing .Что мне показалось особенно странным, так это то, что первое внешнее перекрестное подтверждение выполнено так, как я надеялся, но в начале второго оно прерывается непосредственно перед первым "_ inner_model.fit" .

Я использую этот код:
Распространитель: Linux Mint 19 Tara Kernel 4.15.0-38-generic
Pycharm 2019.2.4 (Community Edition), сборка # PC-182_152-release-1248-b8 amd 64
Python 3.6 с anaconda 5.3.0
xgboost 0.81

Я сталкивался с похожими проблемами / возможными решениями:

https://github.com/dmlc/xgboost/issues/2163
https://gcc.gnu.org/bugzilla/show_bug.cgi?id=58378
https://gcc.gnu.org/bugzilla/attachment.cgi?id=30784&action=diff
https://github.com/joblib/joblib/issues/138
https://bugs.python.org/issue18999
https://hg.python.org/cpython/rev/72a5ac909c7a
https://github.com/dmlc/xgboost/issues/2163
https://github.com/scikit-learn/scikit-learn/issues/2889
https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
https://bisqwit.iki.fi/story/howto/openmp/#OpenmpAndFork
https://github.com/scikit-learn/scikit-learn/issues/6627


, но до сих порне может найти решение.Я призываю вас не уклоняться из-за большого количества кода, потому что я думаю, что, скорее всего, единственная проблема - это упомянутое мной взаимодействие с пакетом, а не то, как я структурировал код.Также, чтобы сделать это проще, я также сделал очень похожий код в самом низу поста, который делает почти то же самое, что и первый скрипт, но с классификатором Random forest из sscikit.

Код:

import xgboost
import time
import itertools
import multiprocessing
from sklearn import datasets
from sklearn.model_selection import StratifiedKFold
# Creating a function for inner crossvalidation (for gridsearch purposes) which will     be used as a function for Pool.apply
def multiprocess_inner_crossvalidation(
        _collection_of_hyperparameters_and_data,
        _X_train,
        _y_train,
        _skfInner,
        _scores):
    _inner_model =     xgboost.XGBClassifier(**(_collection_of_hyperparameters_and_data[0]))
    # inner cv fold splits
    for train_index_inner, test_index_inner in _skfInner.split(_X_train, _y_train):
        X_train_inner, X_test_inner = (_collection_of_hyperparameters_and_data[1])    [train_index_inner], \
                                          (_collection_of_hyperparameters_and_data[1])[test_index_inner]
        y_train_inner, y_test_inner = (_collection_of_hyperparameters_and_data[2])    [train_index_inner], \
                                          (_collection_of_hyperparameters_and_data[2])[test_index_inner]
        _inner_model.fit(
            X_train_inner,
            y_train_inner,
            verbose=False,
            early_stopping_rounds=20,
            eval_metric="auc",
            eval_set=[(X_test_inner, y_test_inner)])
        _scores.append(_inner_model.best_score)
    avgScore = float(sum(_scores)) / len(_scores)
    current__inner_model_params = _inner_model.get_params()
    return [current__inner_model_params, avgScore]
# just an indication to notify when new pool workers are created
def start_process():
    print('Starting', multiprocessing.current_process().name)
# defining the main function
def main_function(XGB_classifier_and_gridsearch_parameters):
    skf = StratifiedKFold(
        n_splits=5,
        shuffle=True)
    skfInner = StratifiedKFold(
        n_splits=5,
        shuffle=True)
    def my_product(dicts):
        return (dict(
            zip(dicts, x)) for x in itertools.product(*dicts.values()))
    bestModelScores = []
    bestModelsOuterparams = []
    outerCounter = 1
    #creating mock data
    X, y = datasets.make_classification(n_samples=500, n_features=30,
                                        n_informative=15, n_redundant=15,
                                        random_state=42)
    # outer cv splits
    for train_index, test_index in skf.split(X, y):
        X_train, X_test = X[train_index], X[test_index]
        y_train, y_test = y[train_index], y[test_index]
        print("\n", 80 * ("-"), " STARTED OUTER CROSSVALIDATION ", outerCounter, 80         * ("-"))
        scores = []
        hyperparameter_combinations = []
        collection_of_hyperparameters_and_data = []
        for params in my_product(XGB_classifier_and_gridsearch_parameters):
            hyperparameter_combinations.append(params)
        collection_of_hyperparameters_and_data = [[x, X_train, y_train] for x in     hyperparameter_combinations]
        inputs = collection_of_hyperparameters_and_data
        # print all tasks which will be forwarded to workers
        print("Pool tasks:")
        for i in inputs:
            print(i[0])
        #initializing  pool of workers and giving them tasks
        pool_size = multiprocessing.cpu_count() - 1
        pool = multiprocessing.Pool(processes=pool_size,
                                    initializer=start_process,
                                    )
        pool_outputs = [pool.apply_async(multiprocess_inner_crossvalidation,
                                         args=(x,
                                               X_train,
                                               y_train,
                                               skfInner,
                                               scores)) for x in inputs]
        pool.close()
        pool.join()
        # appending results and choosing parameters which gave best score
        pool_results = [p.get() for p in pool_outputs]
        tempModels = pool_results
        tempModels.sort(key=lambda x: x[1])
        bestMod = tempModels[-1][0]
        outerModel = xgboost.XGBClassifier(**bestMod)
        outerModel.fit(
            X_train,
            y_train,
            verbose=False,
            early_stopping_rounds=20,
            eval_metric="auc",
            eval_set=[(X_test, y_test)])
        #appending outer fold results
        bestModelScores.append(outerModel.best_score)
        outer_model_params = outerModel.get_params()
        bestModelsOuterparams.append([outer_model_params, outerModel.best_score])
        print("\n", 30 * ("-"), "FINISHING OUTER CROSS VALIDATION", outerCounter, 30     * ("-"))
        print('\nOuter crossvalidation score of the best inner model is:',     outerModel.best_score)
        outerCounter += 1
    #printing average results of the outer folds
    avgBestModelScores = float(sum(bestModelScores)) / len(bestModelScores)
    bestModelsOuterparams.sort(key=lambda x: x[1])
    print("Average score of outer crossvalidation models is", avgBestModelScores)
main_function(XGB_classifier_and_gridsearch_parameters={ 'objective':     ['binary:logistic'], 'nthread': [1], "max_depth": [5, 9], "subsample": [0.7, 1]})

СООБЩЕНИЕ О ВЫХОДЕ И ОШИБКЕ:

-------------------------------------------------------------------------------  STARTED OUTER CROSSVALIDATION  1 --------------------------------------------------------------------------------
Pool tasks:
{'objective': 'binary:logistic', 'nthread': 1, 'max_depth': 5, 'subsample': 0.7}
{'objective': 'binary:logistic', 'nthread': 1, 'max_depth': 5, 'subsample': 1}
{'objective': 'binary:logistic', 'nthread': 1, 'max_depth': 9, 'subsample': 0.7}
{'objective': 'binary:logistic', 'nthread': 1, 'max_depth': 9, 'subsample': 1}
Starting ForkPoolWorker-1
Starting ForkPoolWorker-2
Starting ForkPoolWorker-3

 ------------------------------ FINISHING OUTER CROSS VALIDATION 1 ------------------------------

Outer crossvalidation score of the best inner model is: 0.976078

 --------------------------------------------------------------------------------  STARTED OUTER CROSSVALIDATION  2 --------------------------------------------------------------------------------
Pool tasks:
{'objective': 'binary:logistic', 'nthread': 1, 'max_depth': 5, 'subsample': 0.7}
{'objective': 'binary:logistic', 'nthread': 1, 'max_depth': 5, 'subsample': 1}
{'objective': 'binary:logistic', 'nthread': 1, 'max_depth': 9, 'subsample': 0.7}
{'objective': 'binary:logistic', 'nthread': 1, 'max_depth': 9, 'subsample': 1}
Starting ForkPoolWorker-4
Starting ForkPoolWorker-5
Starting ForkPoolWorker-6
terminate called after throwing an instance of 'dmlc::Error'
  what():  [17:34:11] /workspace/include/xgboost/../../src/common/common.h:41: /workspace/src/common/host_device_vector.cu: 151: initialization error

Stack trace returned 10 entries:
[bt] (0) /home/user/envs/application/xgboost/libxgboost.so(dmlc::StackTrace()+0x3d) [0x7f283eeca5cd]
[bt] (1) /home/user/envs/application/xgboost/libxgboost.so(dmlc::LogMessageFatal::~LogMessageFatal()+0x18) [0x7f283eeca9c8]
[bt] (2) /home/user/envs/application/xgboost/libxgboost.so(dh::ThrowOnCudaError(cudaError, char const*, int)+0x178) [0x7f283f09cc18]
[bt] (3) /home/user/envs/application/xgboost/libxgboost.so(xgboost::HostDeviceVectorImpl<int>::DeviceShard::Init(xgboost::HostDeviceVectorImpl<int>*, int)+0x278) [0x7f283f0fa498]
[bt] (4) /home/user/envs/application/xgboost/libxgboost.so(+0x2fdfeb) [0x7f283f0ccfeb]
[bt] (5) /home/user/envs/application/xgboost/libxgboost.so(xgboost::HostDeviceVectorImpl<int>::Reshard(xgboost::GPUDistribution const&)+0x1b1) [0x7f283f0fb631]
[bt] (6) /home/user/envs/application/xgboost/libxgboost.so(xgboost::obj::RegLossObj<xgboost::obj::LogisticClassification>::GetGradient(xgboost::HostDeviceVector<float> const&, xgboost::MetaInfo const&, int, xgboost::HostDeviceVector<xgboost::detail::GradientPairInternal<float> >*)+0x4f7) [0x7f283f0c06b7]
[bt] (7) /home/user/envs/application/xgboost/libxgboost.so(xgboost::LearnerImpl::UpdateOneIter(int, xgboost::DMatrix*)+0x362) [0x7f283ef411e2]
[bt] (8) /home/user/envs/application/xgboost/libxgboost.so(XGBoosterUpdateOneIter+0x35) [0x7f283eec2ab5]
[bt] (9) /home/user/envs/application/lib/python3.6/lib-dynload/../../libffi.so.6(ffi_call_unix64+0x4c) [0x7f28619dfec0]


terminate called after throwing an instance of 'dmlc::Error'
  what():  [17:34:11] /workspace/include/xgboost/../../src/common/common.h:41: /workspace/src/common/host_device_vector.cu: 151: initialization error

Stack trace returned 10 entries:
[bt] (0) /home/user/envs/application/xgboost/libxgboost.so(dmlc::StackTrace()+0x3d) [0x7f283eeca5cd]
[bt] (1) /home/user/envs/application/xgboost/libxgboost.so(dmlc::LogMessageFatal::~LogMessageFatal()+0x18) [0x7f283eeca9c8]
[bt] (2) /home/user/envs/application/xgboost/libxgboost.so(dh::ThrowOnCudaError(cudaError, char const*, int)+0x178) [0x7f283f09cc18]
[bt] (3) /home/user/envs/application/xgboost/libxgboost.so(xgboost::HostDeviceVectorImpl<int>::DeviceShard::Init(xgboost::HostDeviceVectorImpl<int>*, int)+0x278) [0x7f283f0fa498]
[bt] (4) /home/user/envs/application/xgboost/libxgboost.so(+0x2fdfeb) [0x7f283f0ccfeb]
[bt] (5) /home/user/envs/application/xgboost/libxgboost.so(xgboost::HostDeviceVectorImpl<int>::Reshard(xgboost::GPUDistribution const&)+0x1b1) [0x7f283f0fb631]
[bt] (6) /home/user/envs/application/xgboost/libxgboost.so(xgboost::obj::RegLossObj<xgboost::obj::LogisticClassification>::GetGradient(xgboost::HostDeviceVector<float> const&, xgboost::MetaInfo const&, int, xgboost::HostDeviceVector<xgboost::detail::GradientPairInternal<float> >*)+0x4f7) [0x7f283f0c06b7]
[bt] (7) /home/user/envs/application/xgboost/libxgboost.so(xgboost::LearnerImpl::UpdateOneIter(int, xgboost::DMatrix*)+0x362) [0x7f283ef411e2]
[bt] (8) /home/user/envs/application/xgboost/libxgboost.so(XGBoosterUpdateOneIter+0x35) [0x7f283eec2ab5]
[bt] (9) /home/user/envs/application/lib/python3.6/lib-dynload/../../libffi.so.6(ffi_call_unix64+0x4c) [0x7f28619dfec0]


terminate called after throwing an instance of 'dmlc::Error'
  what():  [17:34:11] /workspace/include/xgboost/../../src/common/common.h:41: /workspace/src/common/host_device_vector.cu: 151: initialization error

Stack trace returned 10 entries:
[bt] (0) /home/user/envs/application/xgboost/libxgboost.so(dmlc::StackTrace()+0x3d) [0x7f283eeca5cd]
[bt] (1) /home/user/envs/application/xgboost/libxgboost.so(dmlc::LogMessageFatal::~LogMessageFatal()+0x18) [0x7f283eeca9c8]
[bt] (2) /home/user/envs/application/xgboost/libxgboost.so(dh::ThrowOnCudaError(cudaError, char const*, int)+0x178) [0x7f283f09cc18]
[bt] (3) /home/user/envs/application/xgboost/libxgboost.so(xgboost::HostDeviceVectorImpl<int>::DeviceShard::Init(xgboost::HostDeviceVectorImpl<int>*, int)+0x278) [0x7f283f0fa498]
[bt] (4) /home/user/envs/application/xgboost/libxgboost.so(+0x2fdfeb) [0x7f283f0ccfeb]
[bt] (5) /home/user/envs/application/xgboost/libxgboost.so(xgboost::HostDeviceVectorImpl<int>::Reshard(xgboost::GPUDistribution const&)+0x1b1) [0x7f283f0fb631]
[bt] (6) /home/user/envs/application/xgboost/libxgboost.so(xgboost::obj::RegLossObj<xgboost::obj::LogisticClassification>::GetGradient(xgboost::HostDeviceVector<float> const&, xgboost::MetaInfo const&, int, xgboost::HostDeviceVector<xgboost::detail::GradientPairInternal<float> >*)+0x4f7) [0x7f283f0c06b7]
[bt] (7) /home/user/envs/application/xgboost/libxgboost.so(xgboost::LearnerImpl::UpdateOneIter(int, xgboost::DMatrix*)+0x362) [0x7f283ef411e2]
[bt] (8) /home/user/envs/application/xgboost/libxgboost.so(XGBoosterUpdateOneIter+0x35) [0x7f283eec2ab5]
[bt] (9) /home/user/envs/application/lib/python3.6/lib-dynload/../../libffi.so.6(ffi_call_unix64+0x4c) [0x7f28619dfec0]


Starting ForkPoolWorker-7
Starting ForkPoolWorker-8
terminate called after throwing an instance of 'dmlc::Error'
  what():  [17:34:11] /workspace/include/xgboost/../../src/common/common.h:41: /workspace/src/common/host_device_vector.cu: 151: initialization error

Stack trace returned 10 entries:
[bt] (0) /home/user/envs/application/xgboost/libxgboost.so(dmlc::StackTrace()+0x3d) [0x7f283eeca5cd]
[bt] (1) /home/user/envs/application/xgboost/libxgboost.so(dmlc::LogMessageFatal::~LogMessageFatal()+0x18) [0x7f283eeca9c8]
[bt] (2) /home/user/envs/application/xgboost/libxgboost.so(dh::ThrowOnCudaError(cudaError, char const*, int)+0x178) [0x7f283f09cc18]
[bt] (3) /home/user/envs/application/xgboost/libxgboost.so(xgboost::HostDeviceVectorImpl<int>::DeviceShard::Init(xgboost::HostDeviceVectorImpl<int>*, int)+0x278) [0x7f283f0fa498]
[bt] (4) /home/user/envs/application/xgboost/libxgboost.so(+0x2fdfeb) [0x7f283f0ccfeb]
[bt] (5) /home/user/envs/application/xgboost/libxgboost.so(xgboost::HostDeviceVectorImpl<int>::Reshard(xgboost::GPUDistribution const&)+0x1b1) [0x7f283f0fb631]
[bt] (6) /home/user/envs/application/xgboost/libxgboost.so(xgboost::obj::RegLossObj<xgboost::obj::LogisticClassification>::GetGradient(xgboost::HostDeviceVector<float> const&, xgboost::MetaInfo const&, int, xgboost::HostDeviceVector<xgboost::detail::GradientPairInternal<float> >*)+0x4f7) [0x7f283f0c06b7]
[bt] (7) /home/user/envs/application/xgboost/libxgboost.so(xgboost::LearnerImpl::UpdateOneIter(int, xgboost::DMatrix*)+0x362) [0x7f283ef411e2]
[bt] (8) /home/user/envs/application/xgboost/libxgboost.so(XGBoosterUpdateOneIter+0x35) [0x7f283eec2ab5]
[bt] (9) /home/user/envs/application/lib/python3.6/lib-dynload/../../libffi.so.6(ffi_call_unix64+0x4c) [0x7f28619dfec0]


Starting ForkPoolWorker-9
Starting ForkPoolWorker-10

И, наконец, версия Random Forest, которая не выбрасываетошибка:

import xgboost
import time
import itertools
import multiprocessing
from multiprocessing import get_context
from sklearn import datasets
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.ensemble import RandomForestClassifier
def multiprocess_inner_crossvalidation(
        _collection_of_hyperparameters_and_data,
        _X_train,
        _y_train,
        _skfInner,
        _scores):
    params = _collection_of_hyperparameters_and_data[0]
    print(params)
    _inner_model = RandomForestClassifier(**params)
    for train_index_inner, test_index_inner in _skfInner.split(_X_train, _y_train):
        X_train_inner, X_test_inner = (_collection_of_hyperparameters_and_data[1])[train_index_inner], \
                                      (_collection_of_hyperparameters_and_data[1])[test_index_inner]
        y_train_inner, y_test_inner = (_collection_of_hyperparameters_and_data[2])[train_index_inner], \
                                      (_collection_of_hyperparameters_and_data[2])[test_index_inner]
        _inner_model.fit(
            X_train_inner,
            y_train_inner,
            # verbose=False,
            # early_stopping_rounds=20,
            # eval_metric="auc",
            # eval_set=[(X_test_inner, y_test_inner)]
        )
        _scores.append(_inner_model.oob_score)
    avgScore = float(sum(_scores)) / len(_scores)
    current__inner_model_params = _inner_model.get_params()
    # tempModels.append([model.get_params(), avgScore])
    print("MOMENT BEFORE EXITING INNER LOOP")
    return [current__inner_model_params, avgScore]
def start_process():
    print('Starting', multiprocessing.current_process().name)
def main_function(XGB_classifier_and_gridsearch_parameters):
    skf = StratifiedKFold(
        n_splits=5,
        shuffle=True)
    skfInner = StratifiedKFold(
        n_splits=5,
        shuffle=True)
    def my_product(dicts):
        return (dict(
            zip(dicts, x)) for x in itertools.product(*dicts.values()))
    bestModelScores = []
    bestModelsOuterparams = []
    outerCounter = 1
    X, y = datasets.make_classification(n_samples=500, n_features=30,
                                        n_informative=15, n_redundant=15,
                                        random_state=42)
    start_time = time.time()
    for train_index, test_index in skf.split(X, y):
        X_train, X_test = X[train_index], X[test_index]
        y_train, y_test = y[train_index], y[test_index]
        print("\n", 80 * ("-"), " STARTED OUTER CROSSVALIDATION ", outerCounter, 80 * ("-"))
        scores = []
        hyperparameter_combinations = []
        collection_of_hyperparameters_and_data = []
        for params in my_product(XGB_classifier_and_gridsearch_parameters):
            hyperparameter_combinations.append(params)
        collection_of_hyperparameters_and_data = [[x, X_train, y_train] for x in hyperparameter_combinations]
        inputs = collection_of_hyperparameters_and_data
        for i in inputs:
            print(i[0])
        pool_size = multiprocessing.cpu_count()
        pool = multiprocessing.Pool(processes=pool_size,
                                    initializer=start_process,
                                    )
        pool_outputs = [pool.apply_async(multiprocess_inner_crossvalidation,
                                         args=(x,
                                               X_train,
                                               y_train,
                                               skfInner,
                                               scores)) for x in inputs]
        pool.close()  # no more tasks
        pool.join()  # wrap up current tasks
        pool_results = [p.get() for p in pool_outputs]
        print("Pool results", pool_results)
        tempModels = pool_results
        tempModels.sort(key=lambda x: x[1])
        bestMod = tempModels[-1][0]
        outerModel = RandomForestClassifier(**bestMod)
        outerModel.fit(
            X_train,
            y_train,
            # verbose=False,
            # early_stopping_rounds=20,
            # eval_metric="auc",
            # eval_set=[(X_test, y_test)]
        )
        bestModelScores.append(outerModel.feature_importances_)
        outer_model_params = outerModel.get_params()
        bestModelsOuterparams.append([outer_model_params, outerModel.feature_importances_])
        print("\n\n\n ", 30 * ("-"), "FINISHING OUTER CROSS VALIDATION", outerCounter, 30 * ("-"))
        print('\nOuter crossvalidation feature importances:', outerModel.feature_importances_)
        print("\n Current outer model params are:\n", outer_model_params, "\n\n\n\n")
        outerCounter += 1
    print("Standard procedure run time was ", start_time - time.time())
    # print("best outer model score was:", best_outer_mod_score)
    return
main_function(XGB_classifier_and_gridsearch_parameters= {"n_estimators": [10, 20], "max_depth": [2,3]})
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...