Может ли Kafka Streams потреблять сообщение в формате и создавать другой формат, такой как сообщение AVRO - PullRequest
0 голосов
/ 03 октября 2019

Я использую потоки kafka для получения строки JSON из одной темы, обработки и генерации ответа для сохранения в другой теме. Однако сообщение, которое необходимо создать в теме ответа, должно быть в формате avro.

Я попытался использовать ключ в качестве строки и значение в качестве SpecificAvroSerde

Следующиймой код для создания топологии:

StreamsBuilder builder = new StreamsBuilder();
KStream<Object, Object> consumerStream =builder.stream(kafkaConfiguration.getConsumerTopic());
consumerStream = consumerStream.map(getKeyValueMapper(keyValueMapperClassName));
consumerStream.to(kafkaConfiguration.getProducerTopic());

Ниже приведен мой конфиг

    if (schemaRegistry != null && schemaRegistry.length > 0) {
        streamsConfig.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, String.join(",", schemaRegistry));          
    }
    streamsConfig.put(this.keySerializerKeyName, StringSerde.class);
    streamsConfig.put(this.valueSerialzerKeyName, SpecificAvroSerde.class);
    streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
    streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
    streamsConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, batchSize);
    streamsConfig.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, FailOnInvalidTimestamp.class);
    streamsConfig.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingGuarantee);
    streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Integer.parseInt(commitIntervalMs));
    streamsConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numberOfThreads);
    streamsConfig.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, replicationFactor);
    streamsConfig.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DeserializationExceptionHandler.class);
    streamsConfig.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, ProductionExceptionHandler.class);
    streamsConfig.put(StreamsConfig.TOPOLOGY_OPTIMIZATION,StreamsConfig.OPTIMIZE);
    streamsConfig.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionMode);
    streamsConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);

Я вижу следующую ошибку при попытке попробовать пример:

org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

1 Ответ

1 голос
/ 04 октября 2019

Проблема с Serdes значения ключа. Вы должны использовать правильные значения serdes при использовании потока и то же самое при публикации потока.

В случае, если вы вводите JSON и хотите опубликовать как Avro, вы можете сделать это следующим образом:

Properties streamsConfig= new Properties();
streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
StreamsBuilder builder = new StreamsBuilder();
KStream<Object, Object> consumerStream =builder.stream(kafkaConfiguration.getConsumerTopic(),Consumed.with(Serdes.String(), Serdes.String()));

// Replace AvroObjectClass with your avro object type
KStream<String,AvroObjectClass> consumerAvroStream = consumerStream.map(getKeyValueMapper(keyValueMapperClassName));
consumerAvroStream.to(kafkaConfiguration.getProducerTopic());
...