Облачные данные и группировка данных в облачном потоке Google - PullRequest
0 голосов
/ 21 мая 2019

У меня есть конвейер, который получает поток событий от PubSub, применяя 1-часовое окно, а затем записывает их в файл в Google Cloud Storage.Недавно я понял, что иногда в 1-часовом окне происходит слишком много событий, поэтому я также добавил триггер, который срабатывает, если на панели находится более 100 000 событий.Теперь проблема в том, что ограничение в 100К запускается, только когда одна группа внутри окна превышает номер, но не весь конвейер.

Соответствующая часть конвейера выглядит следующим образом:

PCollection<String> rawEvents = pipeline
   .apply("Read PubSub Events",
       PubsubIO.readStrings()
               .fromSubscription(options.getInputSubscription()));

rawEvents
   .apply("1h Window",
       Window.<String>into(FixedWindows.of(Duration.standardHours(1))
          .triggering(
              Repeatedly
                 .forever(
                    AfterFirst.of(
                       AfterPane.elementCountAtLeast(100000),
                       AfterWatermark.pastEndOfWindow())))
                 .discardingFiredPanes()
                 .withAllowedLateness(Duration.standardDays(7), 
              Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
          .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY))
   .apply("Write File(s)", new WriteFiles(options, new EventPartitioner()));

Компонент WriteFiles - это PTransform, который расширяется до FileIO.Write, и он группирует элементы по ключу.

Как бы я мог сделать так, чтобы окно срабатывало после полногоиз 100 тыс. событий, находящихся в конвейере, а не 100 тыс. событий для конкретной группы?Заранее спасибо!

...