Как читать CSVRecord в Apache Beam? - PullRequest
       33

Как читать CSVRecord в Apache Beam?

0 голосов
/ 25 февраля 2019

У меня есть Java Iterable объект, Iterable records.И я хочу передать его на конвейер Beam.Я пытался

PCollection csvRecordPC = p.apply ("Создать коллекцию", Create.of (записи));

Это вызвало ошибку

Возникла исключительная ситуация при выполненииКласс JavaНе удается определить кодер по умолчанию для PTransform «Создать», в котором нет элементов.Добавьте элементы, вызовите Create.empty (Coder), Create.empty (TypeDescriptor) или вызовите withCoder (Coder) или withType (TypeDescriptor) в PTransform.

Какой кодер мне использовать?Или как мне написать свой кодер?

1 Ответ

0 голосов
/ 01 марта 2019

Я нашел решение, используя FileIO.

p.apply(FileIO.match().filepattern(options.getInputFile()))
 .apply(FileIO.readMatches())
 .apply(ParDo.of(new CsvParser())) 

CsvPaser () равен

public class CsvParser extends DoFn<ReadableFile, CSVRecord> {
    @DoFn.ProcessElement
    public void processElement(@Element ReadableFile element, DoFn.OutputReceiver<CSVRecord> receiver) throws IOException {
        InputStream is = Channels.newInputStream(element.open());

        Reader reader = new InputStreamReader(is);

        Iterable<CSVRecord> records = CSVFormat.EXCEL.withFirstRecordAsHeader().parse(reader);

        for (CSVRecord record : records) {
            receiver.output(record);
        }
    }
}
...