Поток данных, записывающий pCollection GenericRecords в файлы Parquet - PullRequest
0 голосов
/ 22 января 2020

В apache шаге луча У меня PCollection KV<String, Iterable<KV<Long, GenericRecord>>>>. Я хочу записать все записи итерируемого в один и тот же файл паркета. Мой фрагмент кода приведен ниже

p.apply(ParDo.of(new MapWithAvroSchemaAndConvertToGenericRecord())) // PCollection<GenericRecord>
.apply(ParDo.of(new MapKafkaGenericRecordValue(formatter, options.getFileNameDelimiter()))) //PCollection<KV<String, KV<Long, GenericRecord>>>
.apply(GroupByKey.create()) //PCollection<KV<String, Iterable<KV<Long, GenericRecord>>>>>

. Теперь я хочу записать все записи в Iterable в один и тот же файл паркета (получить имя файла по ключу KV).

1 Ответ

0 голосов
/ 27 января 2020

Я нашел решение проблемы. на шаге -

apply(GroupByKey.create()) //PCollection<KV<String, Iterable<KV<Long, GenericRecord>>>>>

я применю другое преобразование, которое вернет только Iterable в качестве выходной pCollection. `.apply (ParDo.of (new GetIterable ())) // PCollection >> где ключ - это имя файла, в который я должен записать. тогда оставшийся фрагмент будет

.apply(Flatten.iterables())
                .apply(
                        FileIO.<String, KV<String, GenericRecord>>writeDynamic()
                                .by((SerializableFunction<KV<String, GenericRecord>, String>) KV::getKey)
                                .via(
                                        Contextful.fn(
                                                (SerializableFunction<KV<String, GenericRecord>, GenericRecord>) KV::getValue
                                        ),
                                        ParquetIO.sink(schema)
                                                .withCompressionCodec(CompressionCodecName.SNAPPY)


                                )

                                .withTempDirectory("/tmp/temp-beam")
                                .to(options.getGCSBucketUrl())
                                .withNumShards(1)
                                .withDestinationCoder(StringUtf8Coder.of())
                )
...