Spark не может идентифицировать столбец времени события, который передается в withWatermark () - PullRequest
0 голосов
/ 07 июня 2018

Я пытался найти способ повторной рассылки функций в различных событиях на основе временных отметок событий, используя Spark, и я нашел пример кода, который использует mapGroupsWithState для повторной рассылки событий с использованием временных отметок в их репо.

https://github.com/apache/spark/blob/v2.3.0/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala

Чтобы быстро проверить, работает ли эта функция сеанса с временными метками событий, я добавил withWatermark («отметка времени», «10 секунд») (рассматривая время обработки как временную метку события) и изменил ProcessingTimeTimeout на EventTimeTimeout.

  val lines = spark.readStream
  .format("socket")
  .option("host", host)
  .option("port", port)
  .option("includeTimestamp", value = true)
  .load()

 // Split the lines into words, treat words as sessionId of events
 val events = lines
  .withWatermark("timestamp", "10 seconds") // added
  .as[(String, Timestamp)]
  .flatMap { case (line, timestamp) =>
    line.split(" ").map(word => Event(sessionId = word, timestamp))
  }

 val sessionUpdates = events
  .groupByKey(event => event.sessionId)
  .mapGroupsWithState[SessionInfo, SessionUpdate].(GroupStateTimeout.EventTimeTimeout) {
   ...
  }

  // Start running the query that prints the session updates to the console
 val query = sessionUpdates
  .writeStream
  .outputMode("update")
  .format("console")
  .start()

 query.awaitTermination()

Однако, когда я запустил его, Спарк бросил org.apache.spark.sql.AnalysisException и сказал, что

Водяной знак должен быть указан в запросе с использованием '[Dataset / DataFrame].withWatermark () 'для использования времени ожидания события в [map | flatMap] GroupsWithState.Тайм-аут события не поддерживается без водяного знака`

, что неверно и сбивает с толку, поскольку столбец «отметка времени» явно находится в физическом плане после сообщения об исключении:

...
+- EventTimeWatermark timestamp#3: timestamp, interval 10 seconds
   +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@394a6d2b,socket,List(),..., [value#2, timestamp#3]

Я что-то пропустил или сделал что-то не так?

Заранее спасибо!

1 Ответ

0 голосов
/ 19 июня 2018

Добавьте водяной знак после операции плоской карты.Это должно работать:

val events = lines
  .as[(String, Timestamp)]
  .flatMap { case (line, timestamp) =>
    line.split(" ").map(word => Event(sessionId = word, timestamp))
  }.withWatermark("timestamp", "10 seconds") 

 val sessionUpdates = events
  .groupByKey(event => event.sessionId)
  .mapGroupsWithState[SessionInfo, SessionUpdate].(GroupStateTimeout.EventTimeTimeout) {
   ...
  }
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...