Я новичок в Cloud Dataflow / Apache Beam, поэтому концепция / программирование для меня все еще неясна.
Я хочу сделать так, чтобы Dataflow слушал Pubsub и получал сообщения этого формата в JSON:
{
"productId": "...",
"productName": "..."
}
И преобразовать это в:
{
"productId": "...",
"productName": "...",
"sku": "...",
"inventory": {
"revenue": <some Double>,
"stocks": <some Integer>
}
}
Итак, необходимо выполнить следующие шаги:
( IngestFromPubsub ) Получение записей из Pubsub путем прослушивания topi c (1 сообщение Pubsub = 1 запись)
( EnrichDataFromAPI )
а. Десериализовать строку JSON полезной нагрузки в Java объект
b. Вызвав внешний API, используя sku
, я могу обогатить данные каждой записи, добавив атрибут inventory
.
c. Повторно сериализуйте записи.
( WriteToGCS ) Затем каждые x
число (может быть параметризовано) записей, мне нужно записать их в облачное хранилище. Пожалуйста, рассмотрите также тривиальный случай, x=1
. (x=1
, хорошая идея? Боюсь, будет слишком много записей в облачном хранилище)
Несмотря на то, что я парень Python, у меня уже есть трудности с выполнением это в Python, более того, мне нужно написать в Java. Я чувствую головную боль, читая пример Beam в Java, он слишком многословен и труден для понимания. Все, что я понимаю, это то, что каждый шаг - это .apply
для PCollection.
Итак, вот результат моего маленького усилия:
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
options.setStreaming(true);
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply("IngestFromPubsub", PubsubIO.readStrings().fromTopic(options.getTopic()))
// I don't really understand the next part, I just copied from official documentation and filled in some values
.apply(Window.<String>into(FixedWindows.of(Duration.millis(5000)))
.withAllowedLateness(Duration.millis(5000))
.triggering(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(1000)))
.discardingFiredPanes()
)
.apply("EnrichDataFromAPI", ParDo.of(
new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.element();
// help on this part, I heard I need to use Jackson but I don't know, for API HttpClient is sufficient
// ... deserialize, call API, serialize again ...
c.output(enrichedJSONString);
}
}
))
.apply("WriteToGCS",
TextIO.write().withWindowedWrites().withNumShards(1).to(options.getOutput()))
;
PipelineResult result = pipeline.run();
}
Пожалуйста, заполните недостающие части, а также дайте мне совет о работе с окнами (например, какова соответствующая конфигурация и т. д. c.) и в каких шагах я должен вставить / применить ее.