Запрос схемы Avro с использованием Beam SQL - PullRequest
1 голос
/ 28 октября 2019

Я пытаюсь читать файлы avro с помощью Apache Beam и использовать Beam SQL для преобразования данных.

Я все еще новичок в Beam и Java. Вот мой простой код:

public class BeamSQLReadAvro {
    @SuppressWarnings("serial")
    public static void main(String[] args) throws IOException {
        PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
        Pipeline p = Pipeline.create(options);

        /* Schema definition */
        Schema schema = new Schema.Parser().parse(new File("data/RATE_CODE/RATE_CODE.avsc"));

        /* Create record/row */
        PCollection<GenericRecord> records = p.apply(AvroIO.readGenericRecords(schema).from("data/RATE_CODE/*.avro"));

        /* SQL Transform */
        records.apply("SQL Transform 01",SqlTransform.query("SELECT RCODE,RNAME,RDESC FROM PCOLLECTION LIMIT 10"))

        /* Print output */
               .apply("Output",
                      MapElements.via(
                        new SimpleFunction<Row, Row>() {
                          @Override
                          public Row apply(Row input) {
                            System.out.println("PCOLLECTION: " + input.getValues());
                            return input;
                          }
                        }
                      )
               );
        p.run().waitUntilFinish();
    }
}

это дает мне ошибку

Exception in thread "main" java.lang.IllegalStateException: Cannot call getSchema when there is no schema

Я не понимаю, я определил переменную под названием схема. Здесь есть указатели?

1 Ответ

3 голосов
/ 28 октября 2019

На самом деле в вашем конвейере есть два типа схем - схемы Avro и Beam. Схема Avro используется для анализа входных записей Avro, но для преобразования SQL вы должны использовать строки со схемой Beam. Для этого AvroIO предоставляет опцию withBeamSchemas(boolean), которая в вашем случае должна быть установлена ​​на true, например:

AvroIO.readGenericRecords(schema).withBeamSchemas(true).from("data/RATE_CODE/*.avro")
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...