Ошибка «Нет плана для EventTimeWatermark» при использовании структурированной потоковой передачи с сокращением столбцов (spark 2.3.1) - PullRequest
0 голосов
/ 24 апреля 2019

Я получаю сообщение об ошибке «Нет плана для EventTimeWatermark» при выполнении запроса с удалением столбцов с использованием структурированной потоковой передачи с настраиваемым источником данных, который реализует источник данных Spark v2.

Моя реализация источника данных, которая обрабатывает схемы, включает следующее:

class MyDataSourceReader extends DataSourceReader with  SupportsPushDownRequiredColumns { 
    var schema: StructType = createSchema()

    override def readSchema(): StructType = schema

    override def pruneColumns(requiredSchema: StructType) = {
        this.schema = requiredSchema
    }

и затем:

class MyDataSourceReaderStream extends MyDataSourceReader { ...

Это мой тестовый код:

def x(): Unit = {
        val df1 = sparkSession.readStream.format(myV2Source).load()

        val df2 = df1
                .withColumn("epoch", (round(col("epoch") / (30 * 1000)) * 30).cast(TimestampType))
                .withWatermark("epoch", "1 milliseconds")
                .groupBy(col("epoch"), col("id")).count()

        val streamingQuery = df2
                .writeStream
                .format("console")
                .trigger(Trigger.ProcessingTime("10 seconds"))
                .outputMode(OutputMode.Append())
                .start()

        streamingQuery.awaitTermination()
   }

Я получаю следующее исключение:

Caused by: java.lang.AssertionError: assertion failed: No plan for EventTimeWatermark epoch#201: timestamp, interval 1 milliseconds
+- Project [cast((round((cast(epoch#320L as double) / 30000.0), 0) * 30.0) as timestamp) AS epoch#201, id#367L]
   +- DataSourceV2Relation [epoch#320L, id#367L], com.akamai.csi.datacatalog.spark.connectors.endor.v2.reader.stream.EndorDataSourceStreamReader@173b1b7c

at scala.Predef$.assert(Predef.scala:170)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)

Обратите внимание, что в логическом плане я получил DataSourceV2Relation , а не StreamingDataSourceV2Relation , хотя я использую потоковую передачу.

Буду признателен за помощь.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...