Записывать в облачное хранилище каждые X сообщений от Pubsub - PullRequest
0 голосов
/ 05 апреля 2020

Я новичок в Cloud Dataflow / Apache Beam, поэтому концепция / программирование для меня все еще неясна.

Я хочу сделать так, чтобы Dataflow слушал Pubsub и получал сообщения этого формата в JSON:

{
  "productId": "...",
  "productName": "..."
}

И преобразовать это в:

{
  "productId": "...",
  "productName": "...",
  "sku": "...",
  "inventory": {
    "revenue": <some Double>,
    "stocks":  <some Integer>
  }
}

Итак, необходимо выполнить следующие шаги:

  1. ( IngestFromPubsub ) Получение записей из Pubsub путем прослушивания topi c (1 сообщение Pubsub = 1 запись)

  2. ( EnrichDataFromAPI )

    а. Десериализовать строку JSON полезной нагрузки в Java объект

    b. Вызвав внешний API, используя sku, я могу обогатить данные каждой записи, добавив атрибут inventory.

    c. Повторно сериализуйте записи.

  3. ( WriteToGCS ) Затем каждые x число (может быть параметризовано) записей, мне нужно записать их в облачное хранилище. Пожалуйста, рассмотрите также тривиальный случай, x=1. (x=1, хорошая идея? Боюсь, будет слишком много записей в облачном хранилище)

Несмотря на то, что я парень Python, у меня уже есть трудности с выполнением это в Python, более того, мне нужно написать в Java. Я чувствую головную боль, читая пример Beam в Java, он слишком многословен и труден для понимания. Все, что я понимаю, это то, что каждый шаг - это .apply для PCollection.

Итак, вот результат моего маленького усилия:

public static void main(String[] args) {

    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    options.setStreaming(true);

    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply("IngestFromPubsub", PubsubIO.readStrings().fromTopic(options.getTopic()))
        // I don't really understand the next part, I just copied from official documentation and filled in some values
        .apply(Window.<String>into(FixedWindows.of(Duration.millis(5000)))
            .withAllowedLateness(Duration.millis(5000))
            .triggering(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(1000)))
            .discardingFiredPanes()
        )
        .apply("EnrichDataFromAPI", ParDo.of(
            new DoFn<String, String>() {
                @ProcessElement
                public void processElement(ProcessContext c) {
                    c.element();
                    // help on this part, I heard I need to use Jackson but I don't know, for API HttpClient is sufficient
                    // ... deserialize, call API, serialize again ...
                    c.output(enrichedJSONString);
                }
            }
        ))
        .apply("WriteToGCS", 
            TextIO.write().withWindowedWrites().withNumShards(1).to(options.getOutput()))
    ;


    PipelineResult result = pipeline.run();
}

Пожалуйста, заполните недостающие части, а также дайте мне совет о работе с окнами (например, какова соответствующая конфигурация и т. д. c.) и в каких шагах я должен вставить / применить ее.

1 Ответ

2 голосов
/ 07 апреля 2020
  • Не думаю, что вам нужны какие-либо окна в ваших IngestFromPubsub и EnrichDataFromAPI. Цель создания окон состоит в том, чтобы сгруппировать ваши записи, находящиеся рядом во времени, в windows, чтобы вы могли вычислять совокупные вычисления по ним. Но поскольку вы не выполняете каких-либо агрегатных вычислений и заинтересованы в работе с каждой записью независимо, вам не нужно windows.

  • , поскольку вы всегда конвертируете одну входную запись в одна выходная запись, ваш EnrichDataFromAPI должен быть MapElements. Это должно упростить код.

  • Существуют ресурсы для обработки JSON в Apache Bean Java: Apache Обработка потока пучка json data

  • Вам не обязательно использовать Джексона для сопоставления JSON с Java объектом. Возможно, вы сможете напрямую манипулировать JSON. Вы можете использовать Java native JSON API для анализа / манипулирования / сериализации.

...