Облачный поток данных Watermark застрял и увеличивается системное отставание - PullRequest
2 голосов
/ 18 июня 2019

Я читаю записи из темы PubSub в конвейере потока данных. Записи PubSub делятся на фиксированные окна и затем группируются в каждом окне. Каждое окно сортируется по порядковому номеру, так как нам нужно обрабатывать эти записи по порядку, используя beam.SortValues. Затем я записываю записи в Cloud BigTable

Проблема с конвейером - свежесть данных и системная задержка. Кажется, что свежесть данных застряла в какой-то момент, и водяной знак перестает развиваться.

Я использую следующую стратегию управления окнами для создания записей после шага GroupByKey:

PCollection<KV<BigInteger, JSONObject>> window = pubsubRecords.apply("Raw to String", ParDo.of(new LogsFn()))
                .apply("Window", Window
                        .<KV<BigInteger, JSONObject>>into(FixedWindows.of(Duration.standardSeconds(10)))
                        .triggering(Repeatedly.forever(AfterFirst.of(
      AfterPane.elementCountAtLeast(500),
      AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(1)))))
                        .withAllowedLateness(Duration.ZERO).discardingFiredPanes()
                    );

Я думаю, что проблема может быть в стратегии управления окнами. В основном я хочу сделать следующее: чтение записей из PubSub в FixedWindows за 1 минуту, сортировка окна и запись в BigTable. Если я использую триггер defualt, шаг GroupByKey не выдаст никаких результатов. Может ли кто-нибудь помочь мне с этим?

1 Ответ

0 голосов
/ 05 июля 2019

Читая ваш код, прямо сейчас похоже, что ваш ранний триггер и размер окна задом наперед.Ваша стратегия управления окнами на самом деле:

  1. 10 секунд фиксированное окно времени события
  2. Составной ранний триггер либо 1 минута времени обработки, либо 500 элементов на панели.
  3. Поздние события отбрасываются.

Если вы просто хотите, чтобы окна продолжительности события составляли 1 минуту, вот что вам нужно:

PCollection<KV<BigInteger, JSONObject>> window = pubsubRecords.apply("Raw to String", ParDo.of(new LogsFn()))
            .apply("Window", Window
            .<KV<BigInteger, JSONObject>>into(FixedWindows.of(Duration.standardMinutes(1)))
                .withAllowedLateness(Duration.ZERO)
                .discardingFiredPanes()
                .withOnTimeBehavior(OnTimeBehavior.FIRE_ALWAYS));

Пожар всегда является значением по умолчанию OnTimeBehavior, но мы можем сделать это явным для удобства чтения.Если вам нужен составной триггер, вы можете добавить его обратно - я подозреваю, что вы хотели активировать одну 10 секунд или 500 элементов.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...