Avro Schema для GenericRecord: возможность оставлять пустые поля - PullRequest
0 голосов
/ 08 марта 2020

Я использую Java для преобразования JSON в Avro и сохранения их в GCS с помощью Google DataFlow. Схема Avro создается во время выполнения с использованием SchemaBuilder.

Одним из полей, которые я определяю в схеме, является необязательное поле LONG, оно определяется следующим образом:

SchemaBuilder.FieldAssembler<Schema> fields = SchemaBuilder.record(mainName).fields();
Schema concreteType = SchemaBuilder.nullable().longType();
fields.name("key1").type(concreteType).noDefault();

Теперь, когда я создаю GenericRecord, используя схему выше, и "key1" не устанавливается, когда помещаю полученный GenericRecord в контекст моего DoFn: context.output(res); Я получаю следующую ошибку:

Исключение в потоке "main "org. apache .beam.sdk.Pipeline $ PipelineExecutionException: org. apache .avro.UnresolvedUnionException: не в объединении [" long "," null "]: 256

Я также попытался сделать то же самое с withDefault(0L) и получил тот же результат.

Что мне не хватает? Спасибо

Ответы [ 2 ]

1 голос
/ 10 марта 2020

Это работает нормально для меня при попытке, как показано ниже, и вы можете попробовать распечатать схему, которая поможет сравнить, также вы можете удалить nullable () для длинного типа, чтобы попробовать.

fields.name("key1").type().nullable().longType().longDefault(0);

При условии полный код, который я использовал для проверки:

import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.SchemaBuilder.FieldAssembler;
import org.apache.avro.SchemaBuilder.RecordBuilder;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;

import java.io.File;
import java.io.IOException;

public class GenericRecordExample {

  public static void main(String[] args) {

    FieldAssembler<Schema> fields;
    RecordBuilder<Schema> record = SchemaBuilder.record("Customer");
    fields = record.namespace("com.example").fields();
    fields = fields.name("first_name").type().nullable().stringType().noDefault();
    fields = fields.name("last_name").type().nullable().stringType().noDefault();
    fields = fields.name("account_number").type().nullable().longType().longDefault(0);

    Schema schema = fields.endRecord();
    System.out.println(schema.toString());

    // we build our first customer
    GenericRecordBuilder customerBuilder = new GenericRecordBuilder(schema);
    customerBuilder.set("first_name", "John");
    customerBuilder.set("last_name", "Doe");
    customerBuilder.set("account_number", 999333444111L);
    Record myCustomer = customerBuilder.build();
    System.out.println(myCustomer);

    // writing to a file
    final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
    try (DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter)) {
      dataFileWriter.create(myCustomer.getSchema(), new File("customer-generic.avro"));
      dataFileWriter.append(myCustomer);
      System.out.println("Written customer-generic.avro");
    } catch (IOException e) {
      System.out.println("Couldn't write file");
      e.printStackTrace();
    }

    // reading from a file
    final File file = new File("customer-generic.avro");
    final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
    GenericRecord customerRead;
    try (DataFileReader<GenericRecord> dataFileReader = new DataFileReader<>(file, datumReader)){
      customerRead = dataFileReader.next();
      System.out.println("Successfully read avro file");
      System.out.println(customerRead.toString());

      // get the data from the generic record
      System.out.println("First name: " + customerRead.get("first_name"));

      // read a non existent field
      System.out.println("Non existent field: " + customerRead.get("not_here"));
    }
    catch(IOException e) {
      e.printStackTrace();
    }
  }
}
0 голосов
/ 10 марта 2020

Если я правильно понимаю ваш вопрос, вы пытаетесь принять строки JSON и сохранить их в хранилище Cloud Storage, используя Avro в качестве кодера для данных при их перемещении по потоку данных. Из вашего кода нет ничего очевидного для меня. Я сделал это, включая сохранение данных в облачном хранилище и в BigQuery.

Возможно, вы решите использовать более простой и, вероятно, менее подверженный ошибкам подход: определите класс Java для своих данных и используйте аннотации Avro на это чтобы кодер работал правильно. Вот пример:

import org.apache.avro.reflect.Nullable;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;

@DefaultCoder(AvroCoder.class)
public class Data {
    public long nonNullableValue;
    @Nullable public long nullableValue;
}

Затем используйте этот тип в ваших DnFn реализациях, как вы, вероятно, уже сделали. Beam должен уметь правильно перемещать данные между работниками с помощью Avro, даже если поля, помеченные @Nullable, имеют нулевое значение.

...