Как установить значение de / serializer для Kafka KeyValueStore? - PullRequest
1 голос
/ 03 июля 2019

Я храню сообщения из темы Kafka в KeyValueStore, чтобы я мог запросить их позже. Я создаю таблицу KT следующим образом:

@StreamListener public void process(@Input("input") KTable<String,MyMessage> myMessages) {

Я настроил потребителя в моем application.yml следующим образом:

ОБНОВЛЕНО де / сериализатор пакет

spring.cloud.stream.kafka.streams.bindings.input: consumer: materializedAs: all-messages key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: com.me.MyMessageDeserializer key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: com.me.MyMessageSerializer

Однако, когда я читаю из KeyValueStore, ключи возвращаются правильно как строки, но возвращаемые значения - это байтовые массивы, а не MyMessage. По какой-то причине мой пользовательский десериализатор не используется. Я попытался десериализовать сообщение самостоятельно, но мой десериализатор вышел из строя за исключением. Я установил точку останова на моем сериализаторе, и он никогда не вызывается. Мне ясно, что ни мой сериализатор, ни десериализатор не используются.

Какую конфигурацию мне не хватает, чтобы использовать мое пользовательское значение de / serializer? Нужно ли, чтобы де / сериализаторы находились в определенной упаковке?

1 Ответ

1 голос
/ 03 июля 2019

В application.yml использовались неправильные ключи конфигурации.Вместо key-deserializer: это должен быть keySerde: и вместо value-deserializer: это должен быть valueSerde.Ниже приведена правильная конфигурация:

spring.cloud.stream.kafka.streams.bindings.input: consumer: materializedAs: all-messages keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde valueSerde: com.me.MyMessageSerde producer: keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde valueSerde: com.me.MyMessageSerde

...