Я получаю сообщение об ошибке «Нет плана для EventTimeWatermark» при выполнении запроса с удалением столбцов с использованием структурированной потоковой передачи с настраиваемым источником данных, который реализует источник данных Spark v2.
Моя реализация источника данных, которая обрабатывает схемы, включает следующее:
class MyDataSourceReader extends DataSourceReader with SupportsPushDownRequiredColumns {
var schema: StructType = createSchema()
override def readSchema(): StructType = schema
override def pruneColumns(requiredSchema: StructType) = {
this.schema = requiredSchema
}
и затем:
class MyDataSourceReaderStream extends MyDataSourceReader { ...
Это мой тестовый код:
def x(): Unit = {
val df1 = sparkSession.readStream.format(myV2Source).load()
val df2 = df1
.withColumn("epoch", (round(col("epoch") / (30 * 1000)) * 30).cast(TimestampType))
.withWatermark("epoch", "1 milliseconds")
.groupBy(col("epoch"), col("id")).count()
val streamingQuery = df2
.writeStream
.format("console")
.trigger(Trigger.ProcessingTime("10 seconds"))
.outputMode(OutputMode.Append())
.start()
streamingQuery.awaitTermination()
}
Я получаю следующее исключение:
Caused by: java.lang.AssertionError: assertion failed: No plan for EventTimeWatermark epoch#201: timestamp, interval 1 milliseconds
+- Project [cast((round((cast(epoch#320L as double) / 30000.0), 0) * 30.0) as timestamp) AS epoch#201, id#367L]
+- DataSourceV2Relation [epoch#320L, id#367L], com.akamai.csi.datacatalog.spark.connectors.endor.v2.reader.stream.EndorDataSourceStreamReader@173b1b7c
at scala.Predef$.assert(Predef.scala:170)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
Обратите внимание, что в логическом плане я получил DataSourceV2Relation , а не StreamingDataSourceV2Relation , хотя я использую потоковую передачу.
Буду признателен за помощь.