Есть ли способ ввести и конфигурации в ParDo без sideInput? - PullRequest
1 голос
/ 04 июня 2019

У меня есть ParDo, который использует состояние и таймеры с периодически обновляемым PcollectionView в качестве sideInput для этого parDo; Google dataflow выдает исключение, что таймеры не допускаются в таком случае. Есть ли другой способ передачи данных конфигурации в parDo без sideInput? По сути, sideInput представлял собой карту данных конфигурации, которая считывала данные из хранилища данных каждые 24 часа.

В настоящее время я пытаюсь выяснить, могу ли я создать ParDo перед тем, как использовать состояние и таймеры, для периодического обновления конфигурации, но я не вижу, как мы можем получить доступ к этой карте из следующего ParDo. Есть предложения?

Примечание. Этот конвейер работает в потоковом режиме с глобальным окном и читает сообщения Pubsub по мере их поступления. Хранилище данных используется для хранения данных, необходимых для принятия решения о том, когда выводить элемент в тему pubsub.

1 Ответ

0 голосов
/ 05 июня 2019

Вместо использования таймеров состояния для обновления бокового ввода вы можете использовать фиксированное окно для периодического обновления вашего PCollectionView с вашим источником данных:

        PCollectionView<Map<String,String>> sideInput = pipeline
                .apply(notifications)
                .apply(
                        Window.<Long>into(FixedWindows.of(Duration.standardMinutes(refreshMinutes)))
                                .triggering(
                                        Repeatedly.forever(AfterPane.elementCountAtLeast(1))
                                )
                                .withAllowedLateness(Duration.ZERO)
                                .discardingFiredPanes()
                )
                .apply( /* query data source */ )
                .apply(View.<Map<String,String>>asSingleton());
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...