Обновление схемы при записи в файлы Avro - PullRequest
0 голосов
/ 24 января 2020

Контекст: У нас есть задание потока данных, которое преобразует сообщения PubSub в Avro GenericRecords и записывает их в GCS как «.avro». Преобразование между сообщениями PubSub и GenericRecords требует схемы. Эта схема меняется еженедельно только с добавлением полей. Мы хотим иметь возможность обновлять поля без обновления задания Dataflow.

Что мы сделали: Мы воспользовались советом этого поста и создали кеш Guava, который обновляет контент каждую минуту. Функция refre sh будет извлекать схему из GCS. Затем мы имеем FileIO.write, запрашивающий кэш Guava, чтобы получить последнюю схему и преобразовать элементы со схемой как GenericRecord. У нас также есть выходы FileIO.write для приемника Avro, который также создается с использованием схемы.

Код выглядит следующим образом:

genericRecordsAsByteArrays.apply(FileIO.<byte[]>write()
    .via(fn((input, c) -> {
          Map<String, Object> schemaInfo = cache.get("");
          Descriptors.Descriptor paymentRecordFd =
              (Descriptors.Descriptor) schemaInfo.get(DESCRIPTOR_KEY);
          DynamicMessage paymentRecordMsg = DynamicMessage.parseFrom(paymentRecordFd, input);
          Schema schema = (Schema) schemaInfo.get(SCHEMA_KEY);

          //From concrete PaymentRecord bytes to DynamicMessage
          try (ByteArrayOutputStream output = new ByteArrayOutputStream()) {
            BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(output, null);
            ProtobufDatumWriter<DynamicMessage> pbWriter = new ProtobufDatumWriter<>(schema);
            pbWriter.write(paymentRecordMsg, encoder);
            encoder.flush();

            // From dynamic message to GenericRecord
            byte[] avroContents = output.toByteArray();
            DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
            BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(avroContents, null);
            return reader.read(null, decoder);
          }
        }, requiresSideInputs()),
        fn((output, c) -> {
          Map<String, Object> schemaInfo = cache.get("");
          Schema schema = (Schema) schemaInfo.get(SCHEMA_KEY);
          return AvroIO.sink(schema).withCodec(CodecFactory.snappyCodec());
        }, requiresSideInputs()))
    .withNumShards(5)
    .withNaming(new PerWindowFilenames(baseDir, ".avro"))
    .to(baseDir.toString()));

Мои вопросы:

  1. Что произойдет, когда мы записываем в один файл Avro, но внезапно происходит обновление схемы, и теперь мы записываем новую схему в файл Avro, созданный со старой схемой?
  2. Запускает ли Dataflow новый файл, когда видит новую схему?
  3. Поток данных игнорирует новую схему и дополнительные поля, пока не будет создан новый файл?

Каждый файл Avro имеет свою собственную схему в самом начале файла, поэтому я не уверен, каково ожидаемое поведение.

1 Ответ

1 голос
/ 28 января 2020

теперь мы записываем новую схему в файл Avro, созданный со старой схемой

Это невозможно. Каждый файл Avro имеет только одну схему. Если он изменится по определению, вы будете писать в новый файл.

Я сомневаюсь, что Поток данных игнорирует поля.

...