Я пытаюсь написать задание потока данных в Beam / Java, чтобы обработать серию событий, приходящих из Pub / Sub и записывающих в Parquet. События в Pub / Sub представлены в формате JSON, и каждое событие может генерировать одну или несколько строк. Мне удалось написать очень простой пример написания преобразования ParDo, возвращающего только 1 запись. ParDo выглядит следующим образом
static class GenerateRecords extends DoFn<String, GenericRecord> {
@ProcessElement
public void processElement(ProcessContext context) {
final GenericData.Record record = new GenericData.Record(schema);
String msg = context.element();
com.tsp.de.schema.mschema pRecord = GenerateParquetRecord(msg);
context.output(pRecord);
}
}
и часть записи конвейера
.apply("Write to file",
FileIO.<GenericRecord>
write()
.via(
ParquetIO.sink(schema)
.withCompressionCodec(CompressionCodecName.SNAPPY)
)
.to(options.getOutputDirectory())
.withNumShards(options.getNumShards())
.withSuffix("pfile")
);
Мой вопрос: как мне обобщить это преобразование ParDo, чтобы получить список записей? Я попытался List, но это не работает, ParquetIO.sink (схема) лает на "не удается разрешить метод через".