У меня есть два класса scala как часть моей работы на spark-sql, то есть Driver.scala и ExtractorOne.scala.
Driver.scala передает различные параметры, такие как объект sparkSession и т. Д., В различные экстракторы, такие как ExtractorOne.scala и т. Д.
В классах Extractor я извлекаю данные из Oracle и записываю их в виде паркетных файлов в папку hdfs.
В рамках бизнес-логики мне нужно вызвать sparkSession.sql () для выполнения некоторых операций. Но внутри метода extract () класса Extractor / вызывающего класса sparkSession вызывает исключение Nullpointer ... поэтому я попытался проверить это в вызывающей функции, вызвав sparkSession.sql ("show tables"). Show (), который дает результаты т.е. нет проблем с объектом. Где, когда при вызове того же самого, т.е.
Driver.scala
val spark = ConfigUtils.getSparkSession( ...); //spark session initialization successful
val parquetDf = spark.read.format("parquet"); // able to read parquet file data and got the dataframe.
val extractors : LinkedHashMap[String, (DataFrameReader, SparkSession, String, String,String,String) => Unit] = Utils.getAllDefinedExtractors();
///ExtractorOne.scala ExtractorTwo.scala ..etc are extractors as shown in other scala file
for ( key:String <- extractors.keys){
extractors.get(key).map{
spark.sql("show tables").show() ///output
fun => fun(ora_df_options_conf,spark,keyspace,key.trim(),"","")
}
}
'
Вывод spark.sql ("Показать таблицы"). Show () :::
spark.sql("show tables").show()
> Blockquote
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
+--------+---------+-----------+
Но то же самое выдает ошибку в ExtractorOne.scala
'
ExtractorOne.scala
def extract(oraOptionDfConfig: DataFrameReader, sparkSession: SparkSession, keyspace: String,
columnFamilyName: String, fromDate:String , toDate:String ) : Unit ={
val company_df = ..// some opeartion to read the data from oracle to company_df
val dist_df = company_df.distinct("id")
company_df.createOrReplaceTempView("company")
dist_df.foreach( row =>{
if(row.anyNull){
}else{
val sqlQuery:String = s" select * from company where id='%s' and quarter='%s' and year='%s' ".format( row.get(0) , row.get(1) , row.get(2))
sparkSession.sql("show tables").show() ///output...
var partitionDf = sparkSession.sql(sqlQuery)
partitionDf.show(1)
writeAsParquet(...) ///save as parquet file/s
}
}
'
Вывод sparkSession.sql ("Показать таблицы"). Show () :::
ОШИБКА:
Вызывается: java.lang.NullPointerException
в org.apache.spark.sql.SparkSession.sessionState $ lzycompute (SparkSession.scala: 142)
в org.apache.spark.sql.SparkSession.sessionState (SparkSession.scala: 140)
в org.apache.spark.sql.SparkSession.sql (SparkSession.scala: 641)
в com.snp.extractors.CompanyModelValsExtractor $$ anonfun $ extract $ 1.apply (ExtractorOne.scala: 126)
в com.snp.extractors.CompanyModelValsExtractor $$ anonfun $ extract $ 1.apply (ExtractorOne.scala: 113)
в scala.collection.Iterator $ class.foreach (Iterator.scala: 891)
at scala.collection.AbstractIterator.foreach (Iterator.scala: 1334)
'