Проблема схемы Apache Beam SqlTransforms - PullRequest
0 голосов
/ 25 октября 2018

Я пытаюсь выполнить ETL, который включает в себя загрузку файлов из HDFS, применение преобразований и запись их в Hive.При использовании SqlTransforms для выполнения преобразований, следующих за этим документом, я сталкиваюсь с проблемой ниже.Не могли бы вы помочь?

java.lang.IllegalStateException: Cannot call getSchema when there is no schema
    at org.apache.beam.sdk.values.PCollection.getSchema(PCollection.java:328)
    at org.apache.beam.sdk.extensions.sql.impl.schema.BeamPCollectionTable.<init>(BeamPCollectionTable.java:34)
    at org.apache.beam.sdk.extensions.sql.SqlTransform.toTableMap(SqlTransform.java:105)
    at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:90)
    at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:77)
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
    at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:339)
    at org.apache.beam.examples.SqlTest.runSqlTest(SqlTest.java:107)
    at org.apache.beam.examples.SqlTest.main(SqlTest.java:167)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
    at java.lang.Thread.run(Thread.java:748)

Фрагмент кода:

PCollection<String> data = p.apply("ReadLines", TextIO.read().from(options.getInputFile()));

    if(options.getOutput().equals("hive")){
        Schema hiveTableSchema = Schema.builder()
                .addStringField("eid")
                .addStringField("name")
                .addStringField("salary")
                .addStringField("destination")
                .build();
          data.apply(ParDo.of(new DoFn<String, Row>() {
              @ProcessElement
              public void processElement(@Element String input, OutputReceiver<Row> out){
                  String[] values = input.split(",");
                  System.out.println(values);
                  Row row = Row.withSchema(hiveTableSchema)
                                .addValues(values)
                                .build();
                  out.output(row);
              }
          })).apply(SqlTransform.query("select eid, destination from PCOLLECTION"))

                .apply(ParDo.of(new DoFn<Row, HCatRecord>() {
                    @ProcessElement
                    public void processElement(@Element Row input, OutputReceiver<HCatRecord> out){
                        HCatRecord record = new DefaultHCatRecord(input.getFieldCount());
                        for(int i=0; i < input.getFieldCount(); i++){
                            record.set(i, input.getString(i));
                        }
                        out.output(record);
                    }
                        }))
                .apply("WriteData", HCatalogIO.write()
                        .withConfigProperties(configProperties)
                        .withDatabase("wmrpoc")
                        .withTable(options.getOutputTableName()));

1 Ответ

0 голосов
/ 25 октября 2018

Похоже, вам нужно установить схему на PCollection.В прохождении, которое вы связали, есть Create...withCoder(), который справится с этим.В вашем случае схема не может быть выведена из вашего ParDo, единственная информация, которую потенциально может увидеть Beam, - это то, что он выводит элементы типа Row, но нет никакой доступной ему информации о том, придерживается ли ваш ParDo хотя бы одинсхема для всех выходов.Поэтому вам нужно позвонить pcollection.setRowSchema(), прежде чем применять SqlTransform, чтобы сообщить Beam, какую схему вы планируете использовать при конвертации ParDo.

Обновление

И похоже, что большая часть того, что вы делаете до HCatalog, вероятно, в конечном итоге сильно упростится, например, представьте, что вам нужно указать что-то вроде pipeline.apply(TextIO.readCsvRows(schema)).apply(sqlTransform)....Фактически Beam SQL поддерживает чтение CSV-файлов без дополнительного преобразования ParDos (через TextTableProvider), но пока не подключено до SqlTransform и доступно только через Beam SQL CLI

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...