val query1 = spark.sql(s"select * from tmp_table where id IN (select id from tmp_table where date between '${fromDate}' and '$toDate' group by id having count(id) > 1)")
var finalTable = Seq[Template]()
query1.foreach { row =>
//Query on finalRtnView and get latest record for that row
//Update finalTable for that row
import spark.implicits._
finalTable = finalTable :+ finalTableTemplate
//Convert to DF so that data is updated .
val df = finalTable.toDF().createOrReplaceTempView("finalRtnView") //Throws Exception here
}
java .lang.NullPointerException в org. apache .spark. sql .SQLImplicits.localSeqToDatasetHolder (SQLImplicits. scala: 213) в scala .collection.Iterator $ class.foreach (Iterator. scala: 893) в scala .collection.AbstractIterator.foreach (Iterator. scala: 1336) в org. apache .spark.rdd.RDD $$ anonfun $ foreach $ 1 $ $ anonfun $ apply $ 28.apply (СДР. scala: 918) в org. apache .spark.rdd.RDD $$ anonfun $ foreach $ 1 $$ anonfun $ apply $ 28.apply (СДР. scala: 918 ) в орг. apache .spark.SparkContext $$ anonfun $ runJob $ 5.apply (SparkContext. scala: 2062) в орг. apache .spark.SparkContext $$ anonfun $ runJob $ 5.apply (SparkContext. scala: 2062) в орг. apache .spark.scheduler.ResultTask.runTask (ResultTask. scala: 87) в орг. apache .spark.scheduler.Task.run (Задача. scala: 108 ) в org. apache .spark.executor.Executor $ TaskRunner.run (Исполнитель. scala: 335) в java .util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor. java: 1149) в java .util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor. java: 624) в java. lang.Thread.run (Thread. java: 748)
То же самое работает, если я использую для l oop здесь вместо foreach. В случае l oop обработка очень медленная.
Как я могу изменить этот код, чтобы он работал быстрее? Мне нужен обновленный фрейм данных каждый раз в течение l oop.