Я пытаюсь использовать isTimingOut для удаления состояния и отправки его в выходной Dstream. Вот мой код
val mappingFunc = (key: String, value: Option[Person1], state: State[Output]) => {
val defaultPerson = Output("", "", 1L, Long.MaxValue, 0, Some("str"))
val defaultValues =
state
.getOption()
.getOrElse(defaultPerson)
//Do some calculations to get the updated value
val maxi = value.get.time max defaultValues.defaultMinValue
val mini = value.get.time min defaultValues.defaultMaxValue
val first = if (!value.get.school.isEmpty && defaultValues.school == "")
{
value.get.school
}
else
{
defaultValues.school
}
val counter = defaultValues.counter + 1
val last = if (value.get.degree != None) //An update for a non null value
{
value.get.degree
}
else
{
Some(defaultValues.degree.get)
}
val output = Output(key, first, maxi, mini, counter, last)
if (state.isTimingOut()) {
println("State has timed out ...")
state.getOption()
}
else {
println("State is updated")
state.update(output)
}
}
Работает нормально. Но когда я использую isTimingOut, я не получаю ожидаемых результатов. Все, что я хочу, - это если тайм-аут ключа, я хочу, чтобы он был удален из состояния, в противном случае он остается в своем состоянии.
Я звоню по номеру
val stateDstream = pairs.mapWithState(
StateSpec.function(mappingFunc).timeout(Seconds(60)))