Недавно я столкнулся с проблемой, которая сводила меня с ума, так как это происходит только после развертывания в потоке данных, но никогда в локальной среде, где все работает безупречно.К вашему сведению, я использую Apache Beam 2.9.0
.
Я определяю шаг DoFn, который буферизует событие в течение определенного периода времени, например, 5 минут, и после этого запускает некоторую логику.
@StateId("bufferSize")
private final StateSpec<ValueState<Integer>> bufferSizeSpec =
StateSpecs.value(VarIntCoder.of());
@StateId("eventsBuffer")
private final StateSpec<BagState<String>> eventsBufferSpec =
StateSpecs.bag(StringUtf8Coder.of());
@TimerId("trigger")
private final TimerSpec triggerSpec =
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
У меня есть processElement
логика для добавления входящих событий ...
@ProcessElement
public void processElement(
ProcessContext processContext,
@StateId("bufferSize") ValueState<Integer> bufferSize,
@StateId("eventsBuffer") BagState<String> eventsBuffer,
@TimerId("trigger") Timer triggerTimer) {
triggerTimer.offset(Duration.standardMinutes(1)).setRelative();
int size = ObjectUtils.firstNonNull(bufferSize.read(), 0);
eventsBuffer.add(processContext.element().getValue());
bufferSize.write(++size);
}
И затем мой триггер ...
@OnTimer("trigger")
public void onExpiry(
@StateId("bufferSize") ValueState<Integer> bufferSize,
@StateId("eventsBuffer") BagState<String> eventsBuffer) throws Exception {
doSomethingHere();
}
Всякий раз, когда onExpiry
выполняется, параметры, которые он получает, равны нулю и 0.
Что может происходить в кластере?
РЕДАКТИРОВАТЬ :
Окно используется до DoFn.
.apply(
"1min Window",
Window
.<KV<String, String>>into(
FixedWindows.of(Duration.standardMinutes(1)))
.triggering(AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(1)))
.withAllowedLateness(Duration.ZERO)
.accumulatingFiredPanes())