Я храню сообщения из темы Kafka в KeyValueStore, чтобы я мог запросить их позже. Я создаю таблицу KT следующим образом:
@StreamListener
public void process(@Input("input") KTable<String,MyMessage> myMessages) {
Я настроил потребителя в моем application.yml следующим образом:
ОБНОВЛЕНО де / сериализатор пакет
spring.cloud.stream.kafka.streams.bindings.input:
consumer:
materializedAs: all-messages
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: com.me.MyMessageDeserializer
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: com.me.MyMessageSerializer
Однако, когда я читаю из KeyValueStore, ключи возвращаются правильно как строки, но возвращаемые значения - это байтовые массивы, а не MyMessage. По какой-то причине мой пользовательский десериализатор не используется. Я попытался десериализовать сообщение самостоятельно, но мой десериализатор вышел из строя за исключением. Я установил точку останова на моем сериализаторе, и он никогда не вызывается. Мне ясно, что ни мой сериализатор, ни десериализатор не используются.
Какую конфигурацию мне не хватает, чтобы использовать мое пользовательское значение de / serializer? Нужно ли, чтобы де / сериализаторы находились в определенной упаковке?