Есть ли способ создать список SpecificRecord в преобразовании ParDo в Beam для записи файлов Parquet? - PullRequest
0 голосов
/ 06 ноября 2019

Я пытаюсь написать задание потока данных в Beam / Java, чтобы обработать серию событий, приходящих из Pub / Sub и записывающих в Parquet. События в Pub / Sub представлены в формате JSON, и каждое событие может генерировать одну или несколько строк. Мне удалось написать очень простой пример написания преобразования ParDo, возвращающего только 1 запись. ParDo выглядит следующим образом

    static class GenerateRecords extends DoFn<String, GenericRecord> {
        @ProcessElement
        public void processElement(ProcessContext context) {
            final GenericData.Record record = new GenericData.Record(schema);
            String msg = context.element();

            com.tsp.de.schema.mschema pRecord = GenerateParquetRecord(msg);


            context.output(pRecord);
        }
    }

и часть записи конвейера

                .apply("Write to file",
                FileIO.<GenericRecord>
                        write()
                        .via(
                                ParquetIO.sink(schema)
                                        .withCompressionCodec(CompressionCodecName.SNAPPY)
                        )
                        .to(options.getOutputDirectory())
                        .withNumShards(options.getNumShards())
                        .withSuffix("pfile")
                );

Мой вопрос: как мне обобщить это преобразование ParDo, чтобы получить список записей? Я попытался List, но это не работает, ParquetIO.sink (схема) лает на "не удается разрешить метод через".

1 Ответ

1 голос
/ 06 ноября 2019

Вы можете вызывать context.output() в вашем DoFn столько раз, сколько вам нужно. Итак, если вы знаете бизнес-логику, при каких обстоятельствах вам нужно выдать несколько записей, вам просто нужно вызвать context.output(record) для каждой выходной записи. Это должно быть проще, чем иметь PCollection контейнеров.

PS: Кстати, у меня есть простой пример того, как писать GenericRecord s с ParquetIO и AvroCoder это может быть полезно.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...