Существует искровое приложение с двумя агрегирующими шагами, за которыми следует внутреннее соединение.
Агрегатные шаги содержат функцию, которая выдает данные только тогда, когда время истекает:
def getSession[A, B](sessionId: String,
inputs: Iterator[A],
oldState: GroupState[A]): Iterator[B] = {
if (oldState.hasTimedOut) {
oldState.remove()
val finalState: B = ??? //get this when the event expires
Iterator(finalState)
}
else {
val aggrState: A = ???
oldState.update(aggrState)
val latestTimestamp: Long = ???
oldState.setTimeoutTimestamp(latestTimestamp, "5 seconds")
Iterator()
}
}
Этоиспользуется в двух потоках:
val inputStreamOne: Dataset[A] = ???
val inputStreamTwo: Dataset[A] = ???
val aggrInputStreamOneDF = {
inputStreamOne
.withWatermark("eventTimestamp", "2 minutes")
.groupByKey(_.sessionId)
.flatMapGroupsWithState(OutputMode.Append, GroupStateTimeout.EventTimeTimeout)(getSession)
}
val aggrInputStreamOneDF = {
inputStreamTwo
.withWatermark("eventTimestamp", "2 minutes")
.groupByKey(_.sessionId)
.flatMapGroupsWithState(OutputMode.Append, GroupStateTimeout.EventTimeTimeout)(getSession)
}
Эти потоки объединены:
val joinedStream = {
inputStreamOne.withWatermark("inputStreamOneEventTimestamp", "4 minutes").join(
inputStreamTwo.withWatermark("inputStreamTwoEventTimestamp", "4 minutes"),
functions.expr(
"""
|inputStreamOneSessionId = inputStreamTwoSessionId AND
|inputStreamOneEventTimestamp >= inputStreamTwoEventTimestamp AND
|inputStreamOneEventTimestamp <= inputStreamTwoEventTimestamp + interval 10 seconds
|""".stripMargin),
"inner")
}
Проблема, которую мы обнаружили, заключается в том, что Spark не может установить несколько водяных знаков (в данном случаедля водяного знака установлено значение 2 минуты для всех шагов), к тому моменту, когда первый шаг будет генерировать данные, срок действия водяного знака в соединении также истечет, поскольку первый шаг генерирует данные только после истечения срока действия водяного знака.
Возможно ли этообъединять запросы с несколькими водяными знаками при использовании произвольной операции с состоянием?