sparkSession.sql, выбрасывающий NullPointerException - PullRequest
0 голосов
/ 16 января 2019

У меня есть два класса scala как часть моей работы на spark-sql, то есть Driver.scala и ExtractorOne.scala.

Driver.scala передает различные параметры, такие как объект sparkSession и т. Д., В различные экстракторы, такие как ExtractorOne.scala и т. Д.

В классах Extractor я извлекаю данные из Oracle и записываю их в виде паркетных файлов в папку hdfs.

В рамках бизнес-логики мне нужно вызвать sparkSession.sql () для выполнения некоторых операций. Но внутри метода extract () класса Extractor / вызывающего класса sparkSession вызывает исключение Nullpointer ... поэтому я попытался проверить это в вызывающей функции, вызвав sparkSession.sql ("show tables"). Show (), который дает результаты т.е. нет проблем с объектом. Где, когда при вызове того же самого, т.е.


Driver.scala

    val spark = ConfigUtils.getSparkSession( ...); //spark session initialization successful

      val parquetDf = spark.read.format("parquet"); // able to read parquet file data and got the dataframe.


      val extractors :  LinkedHashMap[String, (DataFrameReader, SparkSession, String, String,String,String) => Unit] = Utils.getAllDefinedExtractors(); 
      ///ExtractorOne.scala  ExtractorTwo.scala ..etc are extractors as shown in other scala file


      for ( key:String <- extractors.keys){

                extractors.get(key).map{

                    spark.sql("show tables").show()  ///output

                   fun => fun(ora_df_options_conf,spark,keyspace,key.trim(),"","")
             }
            }

'

Вывод spark.sql ("Показать таблицы"). Show () :::

 spark.sql("show tables").show()



> Blockquote

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
+--------+---------+-----------+

Но то же самое выдает ошибку в ExtractorOne.scala

'

ExtractorOne.scala

def extract(oraOptionDfConfig: DataFrameReader, sparkSession: SparkSession, keyspace: String,
        columnFamilyName: String, fromDate:String , toDate:String ) : Unit ={

        val company_df  =  ..// some opeartion to read the data from oracle to company_df
        val  dist_df = company_df.distinct("id")

         company_df.createOrReplaceTempView("company")

         dist_df.foreach( row =>{

           if(row.anyNull){

           }else{


              val sqlQuery:String = s" select * from company  where id='%s' and quarter='%s' and year='%s' ".format( row.get(0) , row.get(1) , row.get(2))



              sparkSession.sql("show tables").show() ///output...

              var partitionDf = sparkSession.sql(sqlQuery)

              partitionDf.show(1)

               writeAsParquet(...) ///save as parquet file/s
           }


}

'

Вывод sparkSession.sql ("Показать таблицы"). Show () :::

ОШИБКА:


Вызывается: java.lang.NullPointerException в org.apache.spark.sql.SparkSession.sessionState $ lzycompute (SparkSession.scala: 142) в org.apache.spark.sql.SparkSession.sessionState (SparkSession.scala: 140) в org.apache.spark.sql.SparkSession.sql (SparkSession.scala: 641) в com.snp.extractors.CompanyModelValsExtractor $$ anonfun $ extract $ 1.apply (ExtractorOne.scala: 126) в com.snp.extractors.CompanyModelValsExtractor $$ anonfun $ extract $ 1.apply (ExtractorOne.scala: 113) в scala.collection.Iterator $ class.foreach (Iterator.scala: 891) at scala.collection.AbstractIterator.foreach (Iterator.scala: 1334)

  '  

1 Ответ

0 голосов
/ 16 января 2019

вы не можете использовать SparkSession в коде на стороне исполнителя (то есть в dist_df.foreach -петле), в этом случае Spark Session является нулевым (он живет только в драйвере)

...