hasTimedOut
метод никогда не true
в моей функции обработки произвольного состояния updateState
def updateWithEvent(tagCount: TagCount, inputsSize: Int): TagCount = {
TagCount(tagCount.tag, tagCount.count + inputsSize)
}
def updateState(
tag: String,
inputs: Iterator[ExtendedTweet],
oldState: GroupState[TagCount]
): Iterator[TagsStatus] = {
val state = oldState.getOption.getOrElse(TagCount(tag, 0))
val rows = inputs.toSeq.sortBy(_.createdAt.getTime)
val newState = updateWithEvent(state, rows.length)
rows.toIterator.flatMap { input =>
if (oldState.hasTimedOut) {
println("hasTimedOut is never true")
oldState.remove()
Iterator(TagsStatus(input.createdAt, input.post, input.author, input.tag, 1))
} else {
oldState.update(newState)
oldState.setTimeoutTimestamp(input.createdAt.getTime, "30 seconds")
Iterator(TagsStatus(input.createdAt, input.post, input.author, newState.tag, newState.count))
}
}
}
называется
val memorySink = tweetsStream
.withWatermark("createdAt", "30 seconds")
.groupByKey(_.tag)
.flatMapGroupsWithState(OutputMode.Update(), GroupStateTimeout.EventTimeTimeout())(updateState)
.writeStream
.format("memory")
.outputMode("update")
.queryName("tweets_stream")
.start()
С этими параметрами я получил такой вывод:
+-------------------+--------------------+-------------------+-------+---------+
| createdAt| post| author| tag|tagsCount|
+-------------------+--------------------+-------------------+-------+---------+
|2019-07-10 18:20:22|Lots of great con...| Seth Martin|bigdata| 1|
|2019-07-10 18:20:29|DF > Machine lear...| Mr Data Scientist|bigdata| 2|
|2019-07-10 18:20:31|Samsung Galaxy Wa...| NoSQL|bigdata| 3|
|2019-07-10 18:20:42|Sunday Briefing #...| Mr Data Scientist|bigdata| 4|
|2019-07-10 18:20:42|CCA131 Demystify ...| niken|bigdata| 6|
|2019-07-10 18:20:44|Setting alerts in...| NoSQL|bigdata| 6|
|2019-07-10 18:20:47|CCA131 Demystify ...| Javascript30 Bot|bigdata| 11|
|2019-07-10 18:20:47|CCA131 Demystify ...|100 Days Of ML Code|bigdata| 11|
|2019-07-10 18:20:47|CCA131 Demystify ...| BFTawfik Bot 2|bigdata| 11|
|2019-07-10 18:20:47|CCA131 Demystify ...| CodersNotes|bigdata| 11|
|2019-07-10 18:20:47|CCA131 Demystify ...| Adrinbot|bigdata| 11|
+-------------------+--------------------+-------------------+-------+---------+
Я ожидаю удалить старое состояние по истечении времени ожидания окна. И снова посчитать метки от 1. Не из числа подсчетов из предыдущего состояния