JSON Сообщения потребляются строковым потребителем. Моя продукция отправляет два типа сообщений: Strings и Serialized JSON
@KafkaListener(topics = "${kafka.topic}", groupId = "${kafka.group-id}", containerFactory = "${kafka.string-listener-container-factory}")
public void consume(@NotNull ConsumerRecord<String, String> cr, @Payload String payload) {
log.debug("Received asset id: {}, with key: {}, Partition: {}, Offset: {} ", payload, cr.key(), cr.partition(),
cr.offset());
}
@KafkaListener(topics = "${kafka.topic}", groupId = "${kafka.group-id}", containerFactory = "${kafka.json-listener-container-factory}")
public void consumeAssetEvent(@NotNull ConsumerRecord<String, Event> cr, @Payload Event payload) {
log.debug("Received asset id: {}, with key: {}, Partition: {}, Offset: {} ", payload, cr.key(), cr.partition(),
cr.offset());
}
на стороне потребителя. У меня есть два потребителя: 1. прослушивание строкового сообщения 2. прослушивание json и десериализация объекта
Даже сообщения json потребляются слушателем String.