Keras / Tensorflow GridSearchCV на кластере Spark - PullRequest
0 голосов
/ 23 октября 2018

Я пытаюсь выполнить Grid Search в кластере Spark.По этой причине я запускаю nohup ./spark_python_shell.sh > output.log & в своей оболочке bash для запуска кластера Spark, и я также запускаю свой скрипт на python (см. Ниже spark-submit \ --master yarn 'dnn_grid_search.py'):

    SPARK_HOME=/u/users/******/spark-2.3.0 \
    Q_CORE_LOC=/u/users/******/q-core \
    ENV=local \
    HIVE_HOME=/usr/hdp/current/hive-client \
    SPARK2_HOME=/u/users/******/spark-2.3.0 \
    HADOOP_CONF_DIR=/etc/hadoop/conf \
    HIVE_CONF_DIR=/etc/hive/conf \
    HDFS_PREFIX=hdfs:// \
    PYTHONPATH=/u/users/******/q-core/python-lib:/u/users/******/three-queues/python-lib:/u/users/******/pyenv/prod_python_libs/lib/python2.7/site-packages/:$PYTHON_PATH \
    YARN_HOME=/usr/hdp/current/hadoop-yarn-client \
    SPARK_DIST_CLASSPATH=$(hadoop classpath):$(yarn classpath):/etc/hive/conf/hive-site.xml \
    PYSPARK_PYTHON=/usr/bin/python2.7 \
    QQQ_LOC=/u/users/******/three-queues \
    spark-submit \
    --master yarn 'dnn_grid_search.py' \
    --executor-memory 10g \
    --num-executors 8 \
    --executor-cores 10 \
    --conf spark.port.maxRetries=80 \
    --conf spark.dynamicAllocation.enabled=False \
    --conf spark.default.parallelism=6000 \
    --conf spark.sql.shuffle.partitions=6000 \
    --principal ************************ \
    --queue default \
    --name lets_get_starting \
    --keytab /u/users/******/.******.keytab \
    --driver-memory 10g

В этом dnn_grid_search.py Python-скрипт. Существует следующий исходный код, который пытается соединить Grid Search с кластером Spark:

# Create the 'Keras' classifier
# 'keras_model' is a function which returns the 'Keras' model which I built
from sklearn.model_selection import GridSearchCV
from keras.wrappers.scikit_learn import KerasClassifier
classifier = KerasClassifier(build_fn=keras_model, verbose=0)

# Spark configuration
from pyspark import SparkContext, SparkConf
conf = SparkConf()
sc = SparkContext(conf=conf)

# Execute grid search - using spark_sklearn library
from spark_sklearn import GridSearchCV
classifiers_grid = GridSearchCV(sc, estimator=classifier, param_grid=parameters, scoring=custom_scorer, cv=train_valid_predefined_split,n_jobs=-1)
classifiers_grid.fit(X, y)

Когда я запускаю Python-скрипт, я получаю сообщение об ошибке в строке classifiers_grid.fit(X, y), котороевыглядит следующим образом:

ImportError: No module named keras.wrappers.scikit_learn

или более подробно (но без учета всего, потому что он слишком длинный):

18/10/23 12:53:38 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, *******.*******.com, executor 2, partition 2, PROCESS_LOCAL, 42479 bytes)
18/10/23 12:53:38 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, *******.*******.com, executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/u/applic/data/hdfs12/hadoop/yarn/local/usercache/*******/appcache/application_1539785180345_30553/container_e126_1539785180345_30553_01_000003/pyspark.zip/pyspark/worker.py", line 216, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/u/applic/data/hdfs12/hadoop/yarn/local/usercache/*******/appcache/application_1539785180345_30553/container_e126_1539785180345_30553_01_000003/pyspark.zip/pyspark/worker.py", line 60, in read_command
    command = serializer.loads(command.value)
  File "/u/applic/data/hdfs12/hadoop/yarn/local/usercache/*******/appcache/application_1539785180345_30553/container_e126_1539785180345_30553_01_000003/pyspark.zip/pyspark/serializers.py", line 562, in loads
    return pickle.loads(obj)
ImportError: No module named keras.wrappers.scikit_learn

        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
        at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
        at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
        at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

Как использовать GridSearchCVс моделями Keras в кластере Spark (в целом или конкретно с библиотекой spark_sklearn) ?

...