Структурированная потоковая передача искр: тайм-аут не срабатывает - PullRequest
1 голос
/ 05 марта 2020

Я установил длительность тайм-аута на «2 минуты» следующим образом:

  def updateAcrossEvents (tuple3: Tuple3[String, String, String], inputs: Iterator[R00tJsonObject],
                          oldState: GroupState[MyState]): OutputRow = {

    println("$$$$ Inside updateAcrossEvents with : " + tuple3._1 + ", " + tuple3._2 + ", " + tuple3._3)
    var state: MyState = if (oldState.exists) oldState.get else MyState(tuple3._1, tuple3._2, tuple3._3)

    if (oldState.hasTimedOut) {
      println("@@@@@ oldState has timed out @@@@")
      // Logic to Write OutputRow
      OutputRow("some values here...")
    } else {
      for (input <- inputs) {
        state = updateWithEvent(state, input)
        oldState.update(state)
        oldState.setTimeoutDuration("2 minutes")
      }
      OutputRow(null, null, null)
    }

  }

Я также указал ProcessingTimeTimeout в 'mapGroupsWithState' следующим образом ...

.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(updateAcrossEvents)

Но hasTimedOut никогда не соответствует действительности, поэтому я не получаю никакого вывода! Что я делаю не так?

1 Ответ

0 голосов
/ 05 марта 2020

Кажется, это работает, только если входные данные постоянно передаются. Я остановил задание ввода, потому что у меня было достаточно данных, но кажется, что таймауты работают, только если данные непрерывно передаются. Не уверен, почему это так задумано. Это усложняет написание модульных / интеграционных тестов, НО я уверен, что есть причина, почему он спроектирован таким образом. Спасибо.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...