Я знаю, что об этом спрашивали несколько раз, однако пока не могу найти решение. Я могу использовать указанный c Java объект моим @KafkaListener (на уровне класса) и, тем не менее, прекрасно работает, когда я пытаюсь использовать несколько различных JSON объектов из одной и той же топи c ( Я использую @KafkaListener на уровне класса и @KafkaHandler на уровне метода, каждый метод @KafkaHandler ожидает другого объекта), он всегда создает LinkedHashMap. Я могу проанализировать это, получить данные и выполнить фабричный шаблон для генерации различных экземпляров на основе поля json, однако я не хочу этого делать, когда Spring может автоматически определить указанный c @KafkaHandler для маршрутизации сообщения. , Как использовать разные JSON объекты из одной топи c с одним @ KafkaListener.
Я использую следующие конфиги:
public ConsumerFactory<String, Object> abcConsumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG,"group-1");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class );
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
config.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new JsonDeserializer<>(Object.class, false));
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> abcListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(abcConsumerFactory());
return factory;
}
Если я использую реальный класс (Foo или Bar) вместо Object в приведенных выше конфигурациях он отлично работает для этих конкретных объектов. Однако, когда я пытаюсь обобщить, он не go указывает на c @KafkaHandler, а вместо этого переходит к @KafkaHandler с типом Payload LinkedHashMap (я пытался проверить, доставляется ли он @KafkaHandler)
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> abcListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(abcConsumerFactory());
return factory;
}
Class:
@KafkaListener(topics={"abc_gh"}, containerFactory = "abcListenerContainerFactory")
@Service
public class MyListener {
@KafkaHandler
public void consumeMessage(@Payload Foo f) {
//only comes here when i use Foo in my configs instead of Object
}
@KafkaHandler
public void consumeMessage22(@Payload Bar b) {
//only comes here when i use Bar in my configs instead of Object
}
@KafkaHandler
public void consumeMessage77(@Payload LinkedHashMap bc) {
//comes here when i use Object in the configs, even if i expect a Foo or a Bar object
}
}
Одна вещь, которой я хочу поделиться, это то, что Производитель не использует Spring-Kafka.
Я не знаю чего мне не хватает, я много чего перепробовал но не повезло.