Spark несколько водяных знаков с произвольным состоянием - PullRequest
0 голосов
/ 30 сентября 2019

Существует искровое приложение с двумя агрегирующими шагами, за которыми следует внутреннее соединение.

Агрегатные шаги содержат функцию, которая выдает данные только тогда, когда время истекает:

def getSession[A, B](sessionId: String,
                 inputs: Iterator[A],
                 oldState: GroupState[A]): Iterator[B] = {
  if (oldState.hasTimedOut) {
    oldState.remove()
    val finalState: B = ??? //get this when the event expires
    Iterator(finalState)
  }
  else {
    val aggrState: A = ???
    oldState.update(aggrState)
    val latestTimestamp: Long = ???
    oldState.setTimeoutTimestamp(latestTimestamp, "5 seconds")
    Iterator()
  }
}

Этоиспользуется в двух потоках:

val inputStreamOne: Dataset[A] = ???
val inputStreamTwo: Dataset[A] = ???

val aggrInputStreamOneDF = {
  inputStreamOne
    .withWatermark("eventTimestamp", "2 minutes")
    .groupByKey(_.sessionId)
    .flatMapGroupsWithState(OutputMode.Append, GroupStateTimeout.EventTimeTimeout)(getSession)
}
val aggrInputStreamOneDF = {
  inputStreamTwo
    .withWatermark("eventTimestamp", "2 minutes")
    .groupByKey(_.sessionId)
    .flatMapGroupsWithState(OutputMode.Append, GroupStateTimeout.EventTimeTimeout)(getSession)
}

Эти потоки объединены:

val joinedStream = {
  inputStreamOne.withWatermark("inputStreamOneEventTimestamp", "4 minutes").join(
    inputStreamTwo.withWatermark("inputStreamTwoEventTimestamp", "4 minutes"),
    functions.expr(
      """
        |inputStreamOneSessionId = inputStreamTwoSessionId AND
        |inputStreamOneEventTimestamp >= inputStreamTwoEventTimestamp AND
        |inputStreamOneEventTimestamp <= inputStreamTwoEventTimestamp  + interval 10 seconds
        |""".stripMargin),
    "inner")
}

Проблема, которую мы обнаружили, заключается в том, что Spark не может установить несколько водяных знаков (в данном случаедля водяного знака установлено значение 2 минуты для всех шагов), к тому моменту, когда первый шаг будет генерировать данные, срок действия водяного знака в соединении также истечет, поскольку первый шаг генерирует данные только после истечения срока действия водяного знака.

Возможно ли этообъединять запросы с несколькими водяными знаками при использовании произвольной операции с состоянием?

...