Spark - структурированное потоковое произвольное состояние GroupState.hasTimedOut никогда не происходит с flatMapGroupWithState - PullRequest
0 голосов
/ 10 июля 2019

hasTimedOut метод никогда не true в моей функции обработки произвольного состояния updateState

def updateWithEvent(tagCount: TagCount, inputsSize: Int): TagCount = {
  TagCount(tagCount.tag, tagCount.count + inputsSize)
}

def updateState(
                 tag: String,
                 inputs: Iterator[ExtendedTweet],
                 oldState: GroupState[TagCount]
               ): Iterator[TagsStatus] = {

  val state = oldState.getOption.getOrElse(TagCount(tag, 0))
  val rows = inputs.toSeq.sortBy(_.createdAt.getTime)
  val newState = updateWithEvent(state, rows.length)

  rows.toIterator.flatMap { input =>
    if (oldState.hasTimedOut) {
      println("hasTimedOut is never true")
      oldState.remove()
      Iterator(TagsStatus(input.createdAt, input.post, input.author, input.tag, 1))
    } else {
      oldState.update(newState)
      oldState.setTimeoutTimestamp(input.createdAt.getTime, "30 seconds")
      Iterator(TagsStatus(input.createdAt, input.post, input.author, newState.tag, newState.count))
    }
  }
}

называется

val memorySink = tweetsStream
  .withWatermark("createdAt", "30 seconds")
  .groupByKey(_.tag)
  .flatMapGroupsWithState(OutputMode.Update(), GroupStateTimeout.EventTimeTimeout())(updateState)
  .writeStream
  .format("memory")
  .outputMode("update")
  .queryName("tweets_stream")
  .start()

С этими параметрами я получил такой вывод:

+-------------------+--------------------+-------------------+-------+---------+
|          createdAt|                post|             author|    tag|tagsCount|
+-------------------+--------------------+-------------------+-------+---------+
|2019-07-10 18:20:22|Lots of great con...|        Seth Martin|bigdata|        1|
|2019-07-10 18:20:29|DF > Machine lear...|  Mr Data Scientist|bigdata|        2|
|2019-07-10 18:20:31|Samsung Galaxy Wa...|              NoSQL|bigdata|        3|
|2019-07-10 18:20:42|Sunday Briefing #...|  Mr Data Scientist|bigdata|        4|
|2019-07-10 18:20:42|CCA131 Demystify ...|              niken|bigdata|        6|
|2019-07-10 18:20:44|Setting alerts in...|              NoSQL|bigdata|        6|
|2019-07-10 18:20:47|CCA131 Demystify ...|   Javascript30 Bot|bigdata|       11|
|2019-07-10 18:20:47|CCA131 Demystify ...|100 Days Of ML Code|bigdata|       11|
|2019-07-10 18:20:47|CCA131 Demystify ...|     BFTawfik Bot 2|bigdata|       11|
|2019-07-10 18:20:47|CCA131 Demystify ...|        CodersNotes|bigdata|       11|
|2019-07-10 18:20:47|CCA131 Demystify ...|           Adrinbot|bigdata|       11|
+-------------------+--------------------+-------------------+-------+---------+

Я ожидаю удалить старое состояние по истечении времени ожидания окна. И снова посчитать метки от 1. Не из числа подсчетов из предыдущего состояния

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...