Как использовать Apache соединитель луча без работы внутри трубопровода - PullRequest
0 голосов
/ 20 апреля 2020

Мы запускаем нашу программу в модуле kubernetes, который прослушивает сообщение pubsub. В зависимости от типа данных сообщения запускается задание потока данных. И как только выполнение задания заканчивается, мы снова отправляем сообщение pubsub в другую систему.

Конвейер запускается в пакетном режиме и читает из GCS и после обработки записывает в GCS.

Pipeline pipeline = Pipeline.create(options);
PCollection<String> read = pipeline
                .apply("Read from GCS",
                        TextIO.read().from("GCS_PATH").withCompression(Compression.GZIP));

//process 
// write to GCS
....
PipelineResult result = pipeline.run();
result.waitUntilFinish();

# send job completed message to Pubsub to other component
....
....

Как я должен отправить событие другим компонентам в системе. На данный момент я использую клиентскую библиотеку Pubsbub java для сообщения pu sh в pubsub.

Есть ли способ, я могу использовать соединитель Pubsub apache для отправки сообщения, как показано ниже - или что такое правильный способ сделать то же самое

PubsubIO.writeMessages().to("topicName");

1 Ответ

1 голос
/ 20 апреля 2020

Для решения этого варианта использования вы можете использовать API ожидания. Подробности можно найти здесь

 PCollection<Void> firstWriteResults = data.apply(ParDo.of(...write to first database...));
 data.apply(Wait.on(firstWriteResults))
     // Windows of this intermediate PCollection will be processed no earlier than when
     // the respective window of firstWriteResults closes.
     .apply(ParDo.of(...write to second database...));
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...