Метод @OnTimer получает нулевые ссылки при срабатывании - PullRequest
0 голосов
/ 11 июля 2019

Недавно я столкнулся с проблемой, которая сводила меня с ума, так как это происходит только после развертывания в потоке данных, но никогда в локальной среде, где все работает безупречно.К вашему сведению, я использую Apache Beam 2.9.0.

Я определяю шаг DoFn, который буферизует событие в течение определенного периода времени, например, 5 минут, и после этого запускает некоторую логику.

@StateId("bufferSize")
private final StateSpec<ValueState<Integer>> bufferSizeSpec =
  StateSpecs.value(VarIntCoder.of());

@StateId("eventsBuffer")
private final StateSpec<BagState<String>> eventsBufferSpec =
  StateSpecs.bag(StringUtf8Coder.of());

@TimerId("trigger")
private final TimerSpec triggerSpec = 
  TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

У меня есть processElement логика для добавления входящих событий ...

@ProcessElement
public void processElement(
    ProcessContext processContext,
    @StateId("bufferSize") ValueState<Integer> bufferSize,
    @StateId("eventsBuffer") BagState<String> eventsBuffer,
    @TimerId("trigger") Timer triggerTimer) {

  triggerTimer.offset(Duration.standardMinutes(1)).setRelative();
  int size = ObjectUtils.firstNonNull(bufferSize.read(), 0);
  eventsBuffer.add(processContext.element().getValue());
  bufferSize.write(++size);
}

И затем мой триггер ...

@OnTimer("trigger")
public void onExpiry(
    @StateId("bufferSize") ValueState<Integer> bufferSize,
    @StateId("eventsBuffer") BagState<String> eventsBuffer) throws Exception {

  doSomethingHere();
}

Всякий раз, когда onExpiry выполняется, параметры, которые он получает, равны нулю и 0.

Что может происходить в кластере?

РЕДАКТИРОВАТЬ :

Окно используется до DoFn.

.apply(
  "1min Window",
  Window
    .<KV<String, String>>into(
        FixedWindows.of(Duration.standardMinutes(1)))
    .triggering(AfterProcessingTime
                  .pastFirstElementInPane()
                  .plusDelayOf(Duration.standardSeconds(1)))
    .withAllowedLateness(Duration.ZERO)
    .accumulatingFiredPanes())

1 Ответ

0 голосов
/ 15 июля 2019

Важно отметить, что состояние сохраняется для кортежа ключевого окна, когда окно истекает, состояние будет GC'd.

Так что для key-1 ваш объект Bag будет иметь данные для {key-1, TimeInterval-1}, {key-1, TimeInterval-2} и т. Д.

Если вы хотите сильныйСемантика между входными значениями и вашим таймером, вы можете изучить использование таймера EventTime.

...