Я пытаюсь получить данные из Кассандры, используя искровой фрейм данных.Ниже приведен код для извлечения данных из Кассандры и трассировки стека ошибок.Пожалуйста, помогите мне решить проблему.
sparkSession.sql не получает никаких данных, "res" фрейм данных пуст.в основном он не находит подходящих записей.
def getCurrentTrip(s_id1: Long, a_id1: String, summ_typ1: String, summ_dt1:Date, trp_summ_Id1: String): Boolean = {
var foundtrip = false
// val selectTripQuery = "select * from ap.ts where service_id = ? and asset_id = ? and summ_typ = ? and summ_dt = ? and trp_summ_id = ?;"
val df_read2 = sparkSession.read
.format("org.apache.spark.sql.cassandra")
.option("spark.cassandra.connection.host","host")
.option("spark.cassandra.connection.port","9042")
.option( "spark.cassandra.auth.username","user")
.option("spark.cassandra.auth.password","pass")
.option("keyspace","ap")
.option("table","ts")
.load()
df_read2.createOrReplaceTempView("query_data2")
val sqlDate = summ_dt1.getTime()
var res = sparkSession.sql(s"""select * from query_data2 where s_id=$service_id1 and a_id=$asset_id1 and summ_typ='T' and summ_dt=timestamp($sqlDate) and trp_summ_id=("$trp_summ_Id1")""".stripMargin)
println("one..")
res.show()
println("two...")
// val row = res.first()
if (res.count>0) {
println ("Found Trip")
foundtrip = true
} else {
println ("Not Found")
foundtrip = false
}
foundtrip
}
18/09/25 19:58:28 ERROR app.ProcessMPacket$: error for processing this event For M-packet
java.util.NoSuchElementException: next on empty iterator
at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:63)
at scala.collection.IterableLike$class.head(IterableLike.scala:107)
at scala.collection.mutable.ArrayOps$ofRef.scala$collection$IndexedSeqOptimized$$super$head(ArrayOps.scala:186)
at scala.collection.IndexedSeqOptimized$class.head(IndexedSeqOptimized.scala:126)
at scala.collection.mutable.ArrayOps$ofRef.head(ArrayOps.scala:186)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2491)
at org.apache.spark.sql.Dataset.first(Dataset.scala:2498)
at com.vzt.afm.hum.dh.util.CassandraUtils$.getDashBoardData(CassandraUtils.scala:165)
at com.vzt.afm.hum.dh.app.TripAggregation$.updateOdometer(TripAggregation.scala:84)
at com.vzt.afm.hum.dh.app.ProcessMPacket$$anonfun$1$$anonfun$apply$1.apply(ProcessMPacket.scala:175)
at com.vzt.afm.hum.dh.app.ProcessMPacket$$anonfun$1$$anonfun$apply$1.apply(ProcessMPacket.scala:129)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45)
at com.vzt.afm.hum.dh.app.ProcessMPacket$$anonfun$1.apply(ProcessMPacket.scala:129)
at com.vzt.afm.hum.dh.app.ProcessMPacket$$anonfun$1.apply(ProcessMPacket.scala:75)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)