Дополнительные байты с KafkaAvroSerializer - PullRequest
0 голосов
/ 05 июля 2019

Моя настройка следующая: я извлекаю XML-файлы с ftp-сервера, распаковываю их в POJO, сопоставляю их с классом, сгенерированным Avro, а затем перенаправляю его в Приемник Alpakkas примерно так :

Ftp.ls("/", ftpSettings)
  .filter(FtpFile::isFile)
  .mapAsyncUnordered(10,
    ftpFile -> {
      CompletionStage<ByteString> fetchFile =
        Ftp.fromPath(ftpFile.path(), ftpSettings).runWith(Sink.reduce((a, b) -> a), materializer);
      return fetchFile;
    })
  .map(b -> b.decodeString(Charsets.ISO_8859_1))
  .map(StringReader::new)
  .map(AlpakkaProducerDemo::unmarshalFile)
  .map(AlpakkaProducerDemo::convertToAvroSerializable)
  .map(a -> new ProducerRecord<>(kafkaTopic, a.id().toString(), a))
  .map(record -> ProducerMessage.single(record))
  .runWith(Producer.committableSink(producerSettings, kafkaProducer), materializer);

Проблема в том, что сериализация явно не работает должным образом. Например. Я хотел бы, чтобы ключ также был сериализован, хотя это всего лишь строка (требование, не спрашивайте). Конфиг для этого выглядит так:

Map<String, Object> kafkaAvroSerDeConfig = new HashMap<>();
kafkaAvroSerDeConfig.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
final KafkaAvroSerializer keyAvroSerializer = new KafkaAvroSerializer();
keyAvroSerializer.configure(kafkaAvroSerDeConfig, true);
final Serializer<Object> keySerializer = keyAvroSerializer;
final Config config = system.settings().config().getConfig("akka.kafka.producer");
final ProducerSettings producerSettings = ProducerSettings.create(config, keySerializer, valueSerializer)
  .withBootstrapServers(kafkaServer);

В Kafka это приводит к ключу с правильным содержимым, но с некоторыми (кажущимися) дополнительными байтами в начале строки: \u0000\u0000\u0000\u0000\u0001N. Как вы можете себе представить, это наносит ущерб ценности. Я подозреваю, что сериализация Avro не очень подходит для API конвертов, используемого Alpakka, поэтому может потребоваться заранее сериализовать в byte[] и использовать общий ByteSerializer. Однако тогда не было бы смысла использовать SchemaRegistry.

1 Ответ

1 голос
/ 05 июля 2019

Первые пять байтов связаны с версией формата сериализации (байт 0) и версией схемы Avro в реестре схем (байты 1-4): https://docs.confluent.io/current/schema-registry/serializer-formatter.html#wire-format.

Другойвариант может быть просто использовать Kafka Connect, с источником FTP и преобразованием XML.

...