GridSearchCV в кластере Spark - ImportError: модуль не указан - PullRequest
0 голосов
/ 25 октября 2018

Я пытаюсь выполнить поиск сетки в кластере Spark с библиотекой spark-sklearn.По этой причине я запускаю nohup ./spark_python_shell.sh > output.log & в своей оболочке bash для запуска кластера Spark, и я также запускаю свой скрипт на python (см. Ниже spark-submit \ --master yarn 'rforest_grid_search.py'):

    SPARK_HOME=/u/users/******/spark-2.3.0 \
    Q_CORE_LOC=/u/users/******/****** \
    ENV=local \
    HIVE_HOME=/usr/hdp/current/hive-client \
    SPARK2_HOME=/u/users/******/spark-2.3.0 \
    HADOOP_CONF_DIR=/etc/hadoop/conf \
    HIVE_CONF_DIR=/etc/hive/conf \
    HDFS_PREFIX=hdfs:// \
    PYTHONPATH=/u/users/******/******/python-lib:/u/users/******/******/python-lib:/u/users/******/pyenv/prod_python_libs/lib/python2.7/site-packages/:$PYTHON_PATH \
    YARN_HOME=/usr/hdp/current/hadoop-yarn-client \
    SPARK_DIST_CLASSPATH=$(hadoop classpath):$(yarn classpath):/etc/hive/conf/hive-site.xml \
    PYSPARK_PYTHON=/usr/bin/python2.7 \
    QQQ_LOC=/u/users/******/three-queues \
    spark-submit \
    --master yarn 'rforest_grid_search.py' \
    --executor-memory 10g \
    --num-executors 8 \
    --executor-cores 10 \
    --conf spark.port.maxRetries=80 \
    --conf spark.dynamicAllocation.enabled=False \
    --conf spark.default.parallelism=6000 \
    --conf spark.sql.shuffle.partitions=6000 \
    --principal ************************ \
    --queue default \
    --name lets_get_starting \
    --keytab /u/users/******/.******.keytab \
    --driver-memory 10g

В этом rforest_grid_search.py Python-скрипт. Существует следующий исходный код, который пытается соединить Grid Search с кластером Spark:

# Spark configuration
from pyspark import SparkContext, SparkConf
conf = SparkConf()
sc = SparkContext(conf=conf)
print('Spark Context:', sc)

# Hyperparameters' grid
parameters = {'n_estimators': list(range(150, 200, 25)), 'criterion': ['gini', 'entropy'], 'max_depth': list(range(2, 11, 2)), 'max_features': [i/10. for i in range(10, 16)], 'class_weight': [{0: 1, 1: i/10.} for i in range(10, 17)], 'min_samples_split': list(range(2, 7))}

# Execute grid search - using spark_sklearn library
from spark_sklearn import GridSearchCV
from sklearn.ensemble import RandomForestClassifier
classifiers_grid = GridSearchCV(sc, estimator=RandomForestClassifier(), param_grid=parameters, scoring='precision', cv=5, n_jobs=-1)
classifiers_grid.fit(X, y)

Когда я запускаю Python-скрипт, я получаю сообщение об ошибке в строке classifiers_grid.fit(X, y), котораявыглядит следующим образом:

ImportError: No module named model_selection._validation

или, если говорить более подробно (но без учета всего, потому что он слишком длинный), это следующее:

...
    ('Spark Context:', <SparkContext master=yarn appName=rforest_grid_search.py>)
...
    18/10/24 12:43:50 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, oser404637.*****.com, executor 2, partition 2, PROCESS_LOCAL, 42500 bytes)
    18/10/24 12:43:50 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, oser404637.*****.com, executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
      File "/u/applic/data/hdfs2/hadoop/yarn/local/usercache/*****/appcache/application_1539785180345_36939/container_e126_1539785180345_36939_01_000003/pyspark.zip/pyspark/worker.py", line 216, in main
        func, profiler, deserializer, serializer = read_command(pickleSer, infile)
      File "/u/applic/data/hdfs2/hadoop/yarn/local/usercache/*****/appcache/application_1539785180345_36939/container_e126_1539785180345_36939_01_000003/pyspark.zip/pyspark/worker.py", line 58, in read_command
        command = serializer._read_with_length(file)
      File "/u/applic/data/hdfs2/hadoop/yarn/local/usercache/*****/appcache/application_1539785180345_36939/container_e126_1539785180345_36939_01_000003/pyspark.zip/pyspark/serializers.py", line 170, in _read_with_length
        return self.loads(obj)
      File "/u/applic/data/hdfs2/hadoop/yarn/local/usercache/*****/appcache/application_1539785180345_36939/container_e126_1539785180345_36939_01_000003/pyspark.zip/pyspark/serializers.py", line 562, in loads
        return pickle.loads(obj)
    ImportError: No module named model_selection._validation

            at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
            at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
            at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
...

Когда я запускал тот же скрипт на Pythonно немного изменив (с точки зрения перекрестной проверки), я получил следующую ошибку:

Traceback (most recent call last):
  File "/data/users/******/rforest_grid_search.py", line 126, in <module>
    classifiers_grid.fit(X, y)
  File "/usr/lib/python2.7/site-packages/spark_sklearn/grid_search.py", line 274, in fit
    return self._fit(X, y, groups, ParameterGrid(self.param_grid))
  File "/usr/lib/python2.7/site-packages/spark_sklearn/grid_search.py", line 321, in _fit
    indexed_out0 = dict(par_param_grid.map(fun).collect())
  File "/u/users/******/spark-2.3.0/python/lib/pyspark.zip/pyspark/rdd.py", line 824, in collect
  File "/u/users/******/spark-2.3.0/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
  File "/u/users/******/spark-2.3.0/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py", line 320, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 7, oser402389.wal-mart.com, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/u/applic/data/hdfs1/hadoop/yarn/local/usercache/******/appcache/application_1539785180345_42235/container_e126_1539785180345_42235_01_000002/pyspark.zip/pyspark/worker.py", line 216, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/u/applic/data/hdfs1/hadoop/yarn/local/usercache/******/appcache/application_1539785180345_42235/container_e126_1539785180345_42235_01_000002/pyspark.zip/pyspark/worker.py", line 58, in read_command
    command = serializer._read_with_length(file)
  File "/u/applic/data/hdfs1/hadoop/yarn/local/usercache/******/appcache/application_1539785180345_42235/container_e126_1539785180345_42235_01_000002/pyspark.zip/pyspark/serializers.py", line 170, in _read_with_length
    return self.loads(obj)
  File "/u/applic/data/hdfs1/hadoop/yarn/local/usercache/******/appcache/application_1539785180345_42235/container_e126_1539785180345_42235_01_000002/pyspark.zip/pyspark/serializers.py", line 562, in loads
    return pickle.loads(obj)
ImportError: No module named sklearn.base

Как я могу это исправить и выполнить GridSearchCV на кластере Spark?

Означает ли эта ошибка просто, что scikit-learn и / или spark-sklearn не установлены на рабочих узлах Spark (хотя, очевидно, они установлены на узле Spark Edge / Driver, который яиспользуя подключение к кластеру Spark) ?

1 Ответ

0 голосов
/ 25 октября 2018

Означает ли эта ошибка просто, что scikit-learn и / или spark-sklearn не установлены на рабочих узлах Spark

Да, это именно так или точнеечто модули отсутствуют на пути интерпретатора Python, используемого вашими сотрудниками Spark.

В общем случае все модули, которые используются рабочим кодом, должны быть доступны на каждом узле.Существуют различные варианты, в зависимости от сложности зависимостей

  • Установить все зависимости на каждом или в контейнере (если используется).Часто предпочитают, так как нет никаких накладных расходов во время выполнения, и один из них использует оптимизированные собственные библиотеки, если это применимо (крайне важно для высокопроизводительного машинного обучения).
  • Используйте параметры pyfiles для распределения пакетов (обычно eggs) по задаче,Подходит для простых, простых Python-зависимостей, которые не требуют компиляции и не имеют собственных зависимостей.
  • Распространение завершенных виртуальных сред (например, conda) с локальными зависимостями.Может работать в простых случаях, но с большими накладными расходами (большие архивы распределяются с каждой задачей), не работает на кластере со смешанной архитектурой и использует неоптимизированные собственные зависимости.
  • Установка зависимостей Python (если собственныенастоящее время), изнутри задачи - Numpy и статическое связывание
...