Я пытаюсь выполнить Grid Search в кластере Spark.По этой причине я запускаю nohup ./spark_python_shell.sh > output.log &
в своей оболочке bash
для запуска кластера Spark, и я также запускаю свой скрипт на python (см. Ниже spark-submit \ --master yarn 'dnn_grid_search.py'
):
SPARK_HOME=/u/users/******/spark-2.3.0 \
Q_CORE_LOC=/u/users/******/q-core \
ENV=local \
HIVE_HOME=/usr/hdp/current/hive-client \
SPARK2_HOME=/u/users/******/spark-2.3.0 \
HADOOP_CONF_DIR=/etc/hadoop/conf \
HIVE_CONF_DIR=/etc/hive/conf \
HDFS_PREFIX=hdfs:// \
PYTHONPATH=/u/users/******/q-core/python-lib:/u/users/******/three-queues/python-lib:/u/users/******/pyenv/prod_python_libs/lib/python2.7/site-packages/:$PYTHON_PATH \
YARN_HOME=/usr/hdp/current/hadoop-yarn-client \
SPARK_DIST_CLASSPATH=$(hadoop classpath):$(yarn classpath):/etc/hive/conf/hive-site.xml \
PYSPARK_PYTHON=/usr/bin/python2.7 \
QQQ_LOC=/u/users/******/three-queues \
spark-submit \
--master yarn 'dnn_grid_search.py' \
--executor-memory 10g \
--num-executors 8 \
--executor-cores 10 \
--conf spark.port.maxRetries=80 \
--conf spark.dynamicAllocation.enabled=False \
--conf spark.default.parallelism=6000 \
--conf spark.sql.shuffle.partitions=6000 \
--principal ************************ \
--queue default \
--name lets_get_starting \
--keytab /u/users/******/.******.keytab \
--driver-memory 10g
В этом dnn_grid_search.py
Python-скрипт. Существует следующий исходный код, который пытается соединить Grid Search с кластером Spark:
# Create the 'Keras' classifier
# 'keras_model' is a function which returns the 'Keras' model which I built
from sklearn.model_selection import GridSearchCV
from keras.wrappers.scikit_learn import KerasClassifier
classifier = KerasClassifier(build_fn=keras_model, verbose=0)
# Spark configuration
from pyspark import SparkContext, SparkConf
conf = SparkConf()
sc = SparkContext(conf=conf)
# Execute grid search - using spark_sklearn library
from spark_sklearn import GridSearchCV
classifiers_grid = GridSearchCV(sc, estimator=classifier, param_grid=parameters, scoring=custom_scorer, cv=train_valid_predefined_split,n_jobs=-1)
classifiers_grid.fit(X, y)
Когда я запускаю Python-скрипт, я получаю сообщение об ошибке в строке classifiers_grid.fit(X, y)
, котороевыглядит следующим образом:
ImportError: No module named keras.wrappers.scikit_learn
или более подробно (но без учета всего, потому что он слишком длинный):
18/10/23 12:53:38 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, *******.*******.com, executor 2, partition 2, PROCESS_LOCAL, 42479 bytes)
18/10/23 12:53:38 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, *******.*******.com, executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/u/applic/data/hdfs12/hadoop/yarn/local/usercache/*******/appcache/application_1539785180345_30553/container_e126_1539785180345_30553_01_000003/pyspark.zip/pyspark/worker.py", line 216, in main
func, profiler, deserializer, serializer = read_command(pickleSer, infile)
File "/u/applic/data/hdfs12/hadoop/yarn/local/usercache/*******/appcache/application_1539785180345_30553/container_e126_1539785180345_30553_01_000003/pyspark.zip/pyspark/worker.py", line 60, in read_command
command = serializer.loads(command.value)
File "/u/applic/data/hdfs12/hadoop/yarn/local/usercache/*******/appcache/application_1539785180345_30553/container_e126_1539785180345_30553_01_000003/pyspark.zip/pyspark/serializers.py", line 562, in loads
return pickle.loads(obj)
ImportError: No module named keras.wrappers.scikit_learn
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Как использовать GridSearchCVс моделями Keras в кластере Spark (в целом или конкретно с библиотекой spark_sklearn
) ?