Ошибка при выполнении с использованием ./bin/spark-submit в PySpark - PullRequest
0 голосов
/ 01 ноября 2018

Я получаю приведенную ниже ошибку при выполнении кода из командной строки в centOS.

"(<class 'py4j.protocol.Py4JJavaError'>, Py4JJavaError(u'An error occurred while calling o313.save.\n', JavaObject id=o315), <traceback object at 0x7fca49970320>)"

Эта проблема возникает только тогда, когда я отправляю ее через ./bin/spark-submit test.py

Если я использую только: spark-submit test.py все работает нормально. Но я не могу запустить код в yarn с ним.

У меня установлена ​​anaconda на моей машине, и я думаю, что во втором методе используется naconda spark-submit.

Может кто-нибудь подсказать, что с этим делать? Я установил переменные env или обновил библиотеки?

Edit:

Согласно комментариям, предоставляющим более подробную информацию о скрипте и версиях

Это код:

from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql import SparkSession,DataFrame
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoderEstimator, OneHotEncoder
import sys
from operator import add
import os
import pandas as pd
import numpy as np
from pyspark.sql.types import *
import pyspark.sql.functions as func
from IPython.display import display, HTML
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.classification import LogisticRegression,LogisticRegressionModel

path = 'hdfs://10.0.15.42/nih-poc-public-dataset/'

pipeline_path = '/opt/nih/pipeline_model'


try:   
    conf = SparkConf().setMaster("local[8]").setAppName("sparkstream").set("spark.driver.allowMultipleContexts", "true")

    sc = SparkContext(conf = conf)
    sqlContext = SQLContext(sc)

    spark = SparkSession.builder\
            .config(conf = conf)\
            .getOrCreate()

    print "SparkContext Version:", sc.version  
    print "SparkContext Version:", sc.version
    print "Python version: ", sc.pythonVer
    print "Master URL to connect to: ", sc.master
    print "Path where Spark is installed on worker nodes: ", str(sc.sparkHome)
    print "Retrieve name of the Spark User running SparkContext: ", str(sc.sparkUser())
    print "Application name: ", sc.appName
    print "Application ID: ", sc.applicationId
    print "Default level of parallelism: ", sc.defaultParallelism
    print "Default number of partitions for RDDs: ", sc.defaultMinPartitions

    ########################################################################################################
    ############################################# DATA EXTRACTION ##########################################
    ########################################################################################################
    file_name = path + 'maintenance_data.csv'

    df_nih = spark.read.csv(file_name,sep = ';',header = "true",inferSchema="true")   
    df_nih.show(1)

    #print(df_nih.columns)

    print(df_nih.count())
    ########################################################################################################



    ########################################################################################################
    ######################################## PIPELINE FORMATION ############################################
    ########################################################################################################
    categoricalcolumns = ['team','provider']
    numericalcolumns = ['lifetime', 'pressureInd', 'moistureInd', 'temperatureInd']

    stages = []

    for categoricalcol in categoricalcolumns:
        stringindexer = StringIndexer(inputCol = categoricalcol, outputCol = categoricalcol + '_index')
        encoder = OneHotEncoderEstimator(inputCols=[stringindexer.getOutputCol()], outputCols=[categoricalcol + "_classVec"])

        stages += [stringindexer,encoder]
        #stages += [stringindexer]

    assemblerinputs = [c + "_classVec" for c in categoricalcolumns] + numericalcolumns
    vectorassembler_stage = VectorAssembler(inputCols = assemblerinputs 
                                ,outputCol='features')

    stages += [vectorassembler_stage]

    #output = assembler.transform(df_nih)

    #print(output.show(1))

    indexer = StringIndexer(inputCol = 'broken', outputCol = 'label')

    stages += [indexer]

    #pipeline = Pipeline(stages = [stages,vectorassembler_stage,indexer])

    pipeline = Pipeline(stages = stages)

    model_pipeline = pipeline.fit(df_nih)

    model_pipeline.write().overwrite().save(pipeline_path)
except:
    print(sys.exc_info())

finally:
    print('stopping spark session')
    spark.stop()
    sc.stop()

Это вывод:

> /usr/hdp/3.0.0.0-1634/spark2> ./bin/spark-submit --master yarn
> /opt/nih/sample_spark.py SparkContext Version: 2.3.1.3.0.0.0-1634
> SparkContext Version: 2.3.1.3.0.0.0-1634 Python version:  2.7 Master
> URL to connect to:  yarn Path where Spark is installed on worker
> nodes:  None Retrieve name of the Spark User running SparkContext: 
> spark Application name:  sparkstream Application ID: 
> application_1540925663097_0023 Default level of parallelism:  2
> Default number of partitions for RDDs:  2
> +--------+------+----------------+----------------+----------------+-----+---------+ |lifetime|broken|     pressureInd|     moistureInd|  temperatureInd|
> team| provider|
> +--------+------+----------------+----------------+----------------+-----+---------+ |      56|    
> 0|92.1788540640753|104.230204454489|96.5171587259733|TeamA|Provider4|
> +--------+------+----------------+----------------+----------------+-----+---------+ only showing top 1 row
> 
> 999 (<class 'py4j.protocol.Py4JJavaError'>, Py4JJavaError(u'An error
> occurred while calling o309.save.\n', JavaObject id=o311), <traceback
> object at 0x7f9061c86290>) stopping spark session

Если я прокомментирую часть сохранения (model_pipeline.write), все работает нормально. Пожалуйста, предоставьте ваши предложения

...