Есть ли триггер потока данных, который срабатывает после завершения сеанса окна? - PullRequest
0 голосов
/ 28 августа 2018

Я использую Google Cloud PubSub и Dataflow для обработки моих данных. Я хотел бы определить, когда мой ежедневный процесс завершен, другими словами, когда оконная сессия завершена / достигнута длительность перерыва. Есть ли один триггер, который может быть запущен для этого случая? Если нет, могу ли я обойти это решение?

    Pipeline p = Pipeline.create(options);
    p.apply("ReadPubSubMessage", PubsubIO.readMessages().fromSubscription("projects/project-id/subscriptions/my-sub"))
            .apply("ApplyTimestamps", WithTimestamps.of((PubsubMessage pubSub) -> new Instant(System.currentTimeMillis())))
            .apply("SessionWindowing", Window.<PubsubMessage>into(Sessions.withGapDuration(Duration.standardMinutes(10)))
                    .triggering(?)
                    .withAllowedLateness(Duration.standardSeconds(30))
                    .discardingFiredPanes())
                    .apply(new CountWords())

Извините, если я пропустил что-то очевидное в документации.

Ответы [ 2 ]

0 голосов
/ 28 июня 2019

Да, вы можете использовать триггер DefaultTrigger.of (), ниже приведен пример кода. Обратите внимание, что он не работает в DirectRunner, но будет работать в потоке данных Google.

PCollection<KV<String, FormMessageMeta>> formMetaSessionWindowCollection = formMessageMetaKvCollection.apply(
            "Session-Window",
            Window.<KV<String, FormMessageMeta>>into(
                    Sessions.withGapDuration(Duration.standardMinutes(40)))
            .triggering(DefaultTrigger.of()) 
            .withAllowedLateness(Duration.ZERO).accumulatingFiredPanes());

Окно сеанса может быть применено только к KV.

0 голосов
/ 28 августа 2018

Если я правильно понимаю ваш сценарий, ваше окно закроется, когда будет достигнута длительность паузы (за определение сеансов ). По этой причине вы можете использовать Стандартный триггер , поскольку у вас ограниченные окна, он будет срабатывать только один раз. Имеет ли это смысл?

Здесь вы можете найти официальную документацию DefaultTrigger .

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...