Использование isTimingOut в MapWithState - PullRequest
0 голосов
/ 03 сентября 2018

Я пытаюсь использовать isTimingOut для удаления состояния и отправки его в выходной Dstream. Вот мой код

  val mappingFunc = (key: String, value: Option[Person1], state: State[Output]) => {

    val defaultPerson = Output("", "", 1L, Long.MaxValue, 0, Some("str"))

    val defaultValues  =
      state
        .getOption()
        .getOrElse(defaultPerson)

    //Do some calculations to get the updated value

    val maxi = value.get.time max defaultValues.defaultMinValue
    val mini = value.get.time min defaultValues.defaultMaxValue

    val first = if (!value.get.school.isEmpty && defaultValues.school == "")
    {
      value.get.school
    }
    else
    {
      defaultValues.school
    }
    val counter = defaultValues.counter + 1


    val last = if (value.get.degree != None) //An update for a non null value
    {
      value.get.degree
    }
    else
    {
      Some(defaultValues.degree.get)
    }



    val output = Output(key, first, maxi, mini, counter, last)

    if (state.isTimingOut()) {
      println("State has timed out ...")
      state.getOption()
    }
    else {
      println("State is updated")
      state.update(output)
    }

  }

Работает нормально. Но когда я использую isTimingOut, я не получаю ожидаемых результатов. Все, что я хочу, - это если тайм-аут ключа, я хочу, чтобы он был удален из состояния, в противном случае он остается в своем состоянии.

Я звоню по номеру

val stateDstream = pairs.mapWithState(
  StateSpec.function(mappingFunc).timeout(Seconds(60)))
...