Я пытался найти способ повторной рассылки функций в различных событиях на основе временных отметок событий, используя Spark, и я нашел пример кода, который использует mapGroupsWithState
для повторной рассылки событий с использованием временных отметок в их репо.
https://github.com/apache/spark/blob/v2.3.0/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala
Чтобы быстро проверить, работает ли эта функция сеанса с временными метками событий, я добавил withWatermark («отметка времени», «10 секунд») (рассматривая время обработки как временную метку события) и изменил ProcessingTimeTimeout
на EventTimeTimeout
.
val lines = spark.readStream
.format("socket")
.option("host", host)
.option("port", port)
.option("includeTimestamp", value = true)
.load()
// Split the lines into words, treat words as sessionId of events
val events = lines
.withWatermark("timestamp", "10 seconds") // added
.as[(String, Timestamp)]
.flatMap { case (line, timestamp) =>
line.split(" ").map(word => Event(sessionId = word, timestamp))
}
val sessionUpdates = events
.groupByKey(event => event.sessionId)
.mapGroupsWithState[SessionInfo, SessionUpdate].(GroupStateTimeout.EventTimeTimeout) {
...
}
// Start running the query that prints the session updates to the console
val query = sessionUpdates
.writeStream
.outputMode("update")
.format("console")
.start()
query.awaitTermination()
Однако, когда я запустил его, Спарк бросил org.apache.spark.sql.AnalysisException
и сказал, что
Водяной знак должен быть указан в запросе с использованием '[Dataset / DataFrame].withWatermark () 'для использования времени ожидания события в [map | flatMap] GroupsWithState.Тайм-аут события не поддерживается без водяного знака`
, что неверно и сбивает с толку, поскольку столбец «отметка времени» явно находится в физическом плане после сообщения об исключении:
...
+- EventTimeWatermark timestamp#3: timestamp, interval 10 seconds
+- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@394a6d2b,socket,List(),..., [value#2, timestamp#3]
Я что-то пропустил или сделал что-то не так?
Заранее спасибо!