Как прочитать данные c из сообщения от kafka с помощью Spring-kafka @KafkaListener? - PullRequest
0 голосов
/ 27 января 2020

Моя задача - читать события из разных тем (класс всех данных во всех темах - «Событие»). Этот класс содержит поле «data» (Карта), которое содержит спецификацию c для каждого топи c данных, которая может быть десериализована в специфицированный класс c (например, в «DeviceCreateEvent» или что-то в этом роде). Я могу создать потребителей для каждой топи c с помощью @KafkaListener для методов с типом параметра «Событие». Но в этом случае, во-первых, мне нужно вызвать event.getData () и десериализовать его в конкретный класс c, поэтому я получу дублирование кода во всех потребительских методах. Есть ли способ получить в аннотированном потребительском методе уже десериализованный объект для указания c класса?

1 Ответ

0 голосов
/ 27 января 2020

Непонятно, о чем вы спрашиваете.

Если у вас есть разные @KafkaListener для каждого типа темы / события и вы используете JSON, среда автоматически сообщит конвертеру сообщений тип данных должен быть преобразован в; см. документацию .

Хотя API Serializer и Deserializer довольно прост и гибок с точки зрения низкоуровневого Kafka Consumer and Producer, вам может потребоваться больше гибкости в Spring Messaging уровень, при использовании @KafkaListener или Spring Integration. Чтобы упростить преобразование в org.springframework.messaging.Message и обратно, Spring для Apache Kafka предоставляет абстракцию MessageConverter с реализацией MessagingMessageConverter и его настройкой JsonMessageConverter (и подклассами). Вы можете внедрить MessageConverter в экземпляр KafkaTemplate напрямую и с помощью определения bean-компонента AbstractKafkaListenerContainerFactory для свойства @ KafkaListener.containerFactory (). В следующем примере показано, как это сделать: ...

На стороне потребителя вы можете настроить JsonMessageConverter; он может обрабатывать значения ConsumerRecord типа byte [], Bytes и String, поэтому его следует использовать вместе с ByteArrayDeserializer, BytesDeserializer или StringDeserializer. (byte [] и Bytes более эффективны, потому что они избегают ненужного преобразования byte [] в String). Вы также можете настроить специфицированный c подкласс JsonMessageConverter, соответствующий десериализатору, если вы так сделаете sh.

...