Чтение числового типа данных BigQuery из таблицы с использованием класса SchemaAndRecord - PullRequest
1 голос
/ 23 мая 2019

При разработке своего кода я использовал приведенный ниже фрагмент кода для чтения данных таблицы из BigQuery.

PCollection<ReasonCode> gpseEftReasonCodes = input.
                apply("Reading xxyyzz",
                        BigQueryIO.read(new 
                                ReadTable<ReasonCode>(ReasonCode.class))
                        .withoutValidation().withTemplateCompatibility()
                        .fromQuery("Select * from dataset.xxyyzz").usingStandardSql()
                        .withCoder(SerializableCoder.of(xxyyzz.class))

Read Table Class:

@DefaultSchema(JavaBeanSchema.class)
public class ReadTable<T> implements SerializableFunction<SchemaAndRecord, T> {
    private static final long serialVersionUID = 1L;
    private static Gson gson = new Gson();
    public static final Logger LOG = LoggerFactory.getLogger(ReadTable.class);
    private final Counter countingRecords = Metrics.counter(ReadTable.class,"Reading Records EFT Report");
    private  Class<T> class1;

    public ReadTable(Class<T> class1) {
        this.class1 = class1;
    }

    public T apply(SchemaAndRecord schemaAndRecord) {
        Map<String, String> mapping = new HashMap<>();
        int counter = 0;
        try {

            GenericRecord s = schemaAndRecord.getRecord();
            org.apache.avro.Schema s1 = s.getSchema();
            for (Field f : s1.getFields()) {
                counter++;
                mapping.put(f.name(), null==s.get(f.name())?null:String.valueOf(s.get(counter)));
            }
            countingRecords.inc();
            JsonElement jsonElement = gson.toJsonTree(mapping);
            return gson.fromJson(jsonElement, class1);
        }catch(Exception mp) {
            LOG.error("Found Wrong Mapping for the Record: "+mapping);
            mp.printStackTrace();
            return null;
        }
    }
}

Итак, после чтения данных из BigqueryЯ отображал данные из SchemaAndRecord в pojo. Я получал значение для столбцов, тип данных которых указан ниже в числовом формате.

 last_update_amount=java.nio.HeapByteBuffer[pos=0 lim=16 cap=16]

Я ожидал, что получу точное значение, но при получении буфера HyperByte используется версия, которую я использую:Апач луч 2.12.0.Если вам нужна дополнительная информация, пожалуйста, дайте мне знать.

Способ 2 Попытка:

GenericRecord s = schemaAndRecord.getRecord();

            org.apache.avro.Schema s1 = s.getSchema();
            for (Field f : s1.getFields()) {
                counter++;      
                mapping.put(f.name(), null==s.get(f.name())?null:String.valueOf(s.get(counter)));
                if(f.name().equalsIgnoreCase("reason_code_id")) {
                    BigDecimal numericValue =
                            new Conversions.DecimalConversion()
                                .fromBytes((ByteBuffer)s.get(f.name()) , Schema.create(s1.getType()), s1.getLogicalType());
                    System.out.println("Numeric Con"+numericValue);
                }
                else {
                        System.out.println("Else Condition "+f.name());
                }
            }
            ```

Facing Issue:

2019-05-24 (14:10:37) org.apache.avro.AvroRuntimeException: Can't create a: RECORD


Way 2:

    GenericRecord s = schemaAndRecord.getRecord();

                org.apache.avro.Schema s1 = s.getSchema();
                for (Field f : s1.getFields()) {
                    counter++;      
                    mapping.put(f.name(), null==s.get(f.name())?null:String.valueOf(s.get(counter)));
                    if(f.name().equalsIgnoreCase("reason_code_id")) {
                        BigDecimal numericValue =
                                new Conversions.DecimalConversion()
                                    .fromBytes((ByteBuffer)s.get(f.name()) , Schema.create(s1.getType()), s1.getLogicalType());
                        System.out.println("Numeric Con"+numericValue);
                    }
                    else {
                            System.out.println("Else Condition "+f.name());
                    }
                }
                ```

    Facing Issue:

    2019-05-24 (14:10:37) org.apache.avro.AvroRuntimeException: Can't create a: RECORD

    StackTrace

    java.io.IOException: Failed to start reading from source: gs://trusted-bucket/mgp/temp/BigQueryExtractTemp/3a5365f1e53d4dd393f0eda15a2c6bd4/000000000000.avro range [0, 65461)
        at org.apache.beam.runners.dataflow.worker.WorkerCustomSources$BoundedReaderIterator.start(WorkerCustomSources.java:596)
        at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:361)
        at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:194)
        at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
        at org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
        at org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:411)
        at org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:380)
        at org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:306)
        at org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:135)
        at org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:115)
        at org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:102)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
    Caused by: org.apache.avro.AvroRuntimeException: Can't create a: RECORD
        at org.apache.avro.Schema.create(Schema.java:120)
        at com.globalpay.WelcomeEmail.mapRecordToObject(WelcomeEmail.java:118)
        at com.globalpay.WelcomeEmail.access$0(WelcomeEmail.java:112)
        at com.globalpay.WelcomeEmail$1.apply(WelcomeEmail.java:54)
        at com.globalpay.WelcomeEmail$1.apply(WelcomeEmail.java:1)
        at org.apache.beam.sdk.io.gcp.bigquery.BigQuerySourceBase$1.apply(BigQuerySourceBase.java:221)
        at org.apache.beam.sdk.io.gcp.bigquery.BigQuerySourceBase$1.apply(BigQuerySourceBase.java:214)
        at org.apache.beam.sdk.io.AvroSource$AvroBlock.readNextRecord(AvroSource.java:567)
        at org.apache.beam.sdk.io.BlockBasedSource$BlockBasedReader.readNextRecord(BlockBasedSource.java:209)
        at org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.advanceImpl(FileBasedSource.java:484)
        at org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.startImpl(FileBasedSource.java:479)
        at org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.start(OffsetBasedSource.java:249)
        at org.apache.beam.runners.dataflow.worker.WorkerCustomSources$BoundedReaderIterator.start(WorkerCustomSources.java:593)
        ... 14 more

1 Ответ

0 голосов
/ 11 июня 2019

Общий подход правильный.Трудно понять, что именно не так.Пожалуйста, вставьте полные следы стека, если это возможно.Также посмотрите на примеры использования BigQueryIO.read(), они могут помочь: https://beam.apache.org/releases/javadoc/2.13.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.html

И вместо read() вы можете использовать readTableRows() вместо этого и получить проанализированные значения.Или следуйте реализации TableRowParser для примера того, как такой парсер будет работать (он используется в readTableRows()): https://github.com/apache/beam/blob/79d478a83be221461add1501e218b9a4308f9ec8/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L449

Также вы можете взглянуть на фрагменты здесь: https://github.com/apache/beam/blob/77cf84c634381495d45a112a9d147ad69394c0d4/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java#L168

Обновление

По-видимому, недавно была добавлена ​​возможность чтения строк с помощью Beam-схем: https://github.com/apache/beam/pull/8620

Вы должны быть в состоянии что-то сделать в этихлинии сейчас:

p.apply(BigQueryIO.readTableRowsWithSchema())
  .apply(Convert.to(PojoClass.class));
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...