Я пытаюсь объединить (на ключ) источник потоковых данных в Apache Beam (через Scio), используя DoFn с отслеживанием состояния (используя @ProcessElement
с @StateId
ValueState
элементами). Я думал, что это будет наиболее подходящим для проблемы, которую я пытаюсь решить. Требования:
- для данного ключа, записи агрегированы (по существу, суммированы) за все время - Мне не нужны ранее вычисленные агрегаты, только самые последние
- ключей могут быть исключены из состояния (
state.clear()
) на основании определенных условий, которыми я управляю - Каждые 5 минут, независимо от того, были ли обнаружены какие-либо новые ключи , все ключи , которые не были исключены из состояния , должны быть выведены
Учитывая, что это потоковый конвейер и будет работать бесконечно, используя combinePerKey
По всему глобальному окну с накапливающимися запущенными панелями кажется, что он продолжит увеличивать объем памяти и объем данных, которые ему нужны для работы с течением времени, поэтому я бы хотел этого избежать. Кроме того, при тестировании этого (возможно, как и ожидалось) он просто добавляет вновь вычисленные агрегаты к выводу вместе с историческим вводом, а не использует последнее значение для каждого ключа.
Я думал, что при использовании StatefulDoFn просто позволил бы мне выводить все глобальное состояние до сих пор (), но, похоже, это не тривиальное решение. Я видел намеки на использование таймеров для искусственного выполнения обратных вызовов для этого, а также на возможном использовании медленно растущей боковой входной карты ( Как решить исключение Duplicate values при создании PCollectionView > ) и как-то сбрасывать это, но это, по сути, потребует итерации по всем значениям на карте, а не присоединения к ней.
Мне кажется, что я могу пропустить что-то простое, чтобы заставить это работать. Я относительно новичок во многих концепциях управления окнами и таймерами в Beam, ищу любой совет о том, как решить эту проблему. Спасибо!
* ОБНОВЛЕНИЕ *
У меня есть закодированная концепция, но я замечаю, что даже при сбросе вывода с помощью команды do fn с отслеживанием состояния (я вижу вывод в журналах и состояния отладки в пользовательском интерфейсе потока данных) данные не выводятся ...:
Stateful Do Fn
// DomainState is a custom case class I'm using
type DoFnT = DoFn[KV[String, DomainState], KV[String, DomainState]]
class StatefulDoFn extends DoFnT {
@StateId("key")
private val keySpec = StateSpecs.value[String]()
@StateId("domainState")
private val domainStateSpec = StateSpecs.value[DomainState]()
@TimerId("loopingTimer")
private val loopingTimer: TimerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME)
@ProcessElement
def process(
context: DoFnT#ProcessContext,
@StateId("key") stateKey: ValueState[String],
@StateId("domainState") stateValue: ValueState[DomainState],
@TimerId("loopingTimer") loopingTimer: Timer): Unit = {
... logic to create key/value
if (keepState(value)) {
loopingTimer.offset(Duration.standardSeconds(150)).setRelative()
stateKey.write(key)
stateValue.write(value)
if (flushState(value)) {
context.output(KV.of(key, value))
}
} else {
stateValue.clear()
}
}
@OnTimer("loopingTimer")
def onLoopingTimer(
context: DoFnT#OnTimerContext,
@StateId("key") stateKey: ValueState[String],
@StateId("domainState") stateValue: ValueState[DomainState],
@TimerId("loopingTimer") loopingTimer: Timer): Unit = {
... logic to create key/value checking for nulls
if (keepState(value)) {
loopingTimer.offset(Duration.standardSeconds(150)).setRelative()
if (flushState(value)) {
context.output(KV.of(key, value))
}
}
}
}
С конвейером
sc
.pubsubSubscription(...)
.keyBy(...)
.withGlobalWindow(
WindowOptions(
trigger = AfterPane.elementCountAtLeast(1)))
.applyPerKeyDoFn(new StatefulDoFn())
.withFixedWindows(
duration = Duration.standardMinutes(5),
options = WindowOptions(
accumulationMode = DISCARDING_FIRED_PANES,
trigger = AfterWatermark.pastEndOfWindow(),
allowedLateness = Duration.ZERO,
// Only take the latest per key during a window
timestampCombiner = TimestampCombiner.LATEST
))
.saveAsCustomOutput(TextIO.write()...)