Spring-Kafka @KafkaHandlers не потребляет соответствующие java объектов - PullRequest
0 голосов
/ 25 апреля 2020

Я знаю, что об этом спрашивали несколько раз, однако пока не могу найти решение. Я могу использовать указанный c Java объект моим @KafkaListener (на уровне класса) и, тем не менее, прекрасно работает, когда я пытаюсь использовать несколько различных JSON объектов из одной и той же топи c ( Я использую @KafkaListener на уровне класса и @KafkaHandler на уровне метода, каждый метод @KafkaHandler ожидает другого объекта), он всегда создает LinkedHashMap. Я могу проанализировать это, получить данные и выполнить фабричный шаблон для генерации различных экземпляров на основе поля json, однако я не хочу этого делать, когда Spring может автоматически определить указанный c @KafkaHandler для маршрутизации сообщения. , Как использовать разные JSON объекты из одной топи c с одним @ KafkaListener.

Я использую следующие конфиги:

public ConsumerFactory<String, Object> abcConsumerFactory() {       
    Map<String, Object> config = new HashMap<>();       
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    config.put(ConsumerConfig.GROUP_ID_CONFIG,"group-1");
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class );
    config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    config.put(JsonDeserializer.TRUSTED_PACKAGES, "*");

    return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new JsonDeserializer<>(Object.class, false)); 

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> abcListenerContainerFactory() {      
    ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(abcConsumerFactory());
    return factory;
    }

Если я использую реальный класс (Foo или Bar) вместо Object в приведенных выше конфигурациях он отлично работает для этих конкретных объектов. Однако, когда я пытаюсь обобщить, он не go указывает на c @KafkaHandler, а вместо этого переходит к @KafkaHandler с типом Payload LinkedHashMap (я пытался проверить, доставляется ли он @KafkaHandler)

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> abcListenerContainerFactory() {      
    ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(abcConsumerFactory());
    return factory;
    }

Class:  

    @KafkaListener(topics={"abc_gh"}, containerFactory = "abcListenerContainerFactory")
    @Service
    public class MyListener {

    @KafkaHandler
    public void consumeMessage(@Payload Foo f) {
        //only comes here when i use Foo in my configs instead of Object

    }

    @KafkaHandler
    public void consumeMessage22(@Payload Bar b) {
        //only comes here when i use Bar in my configs instead of Object

    }   

    @KafkaHandler
    public void consumeMessage77(@Payload LinkedHashMap bc) {

        //comes here when i use Object in the configs, even if i expect a Foo or a Bar object

    }               
    }

Одна вещь, которой я хочу поделиться, это то, что Производитель не использует Spring-Kafka.

Я не знаю чего мне не хватает, я много чего перепробовал но не повезло.

1 Ответ

1 голос
/ 25 апреля 2020

Как указано , в документации говорится:

При использовании методов @KafkaHandler полезная нагрузка должна быть уже преобразована в объект домена (чтобы можно было выполнить сопоставление). Используйте пользовательский десериализатор, JsonDeserializer или JsonMessageConverter с его TypePrecedence, установленной на TYPE_ID. См. Сериализация, Десериализация и Преобразование сообщений для получения дополнительной информации.

Для того, чтобы перейти к правильному методу, ConsumerRecord должен уже иметь правильный тип.

config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class );

Вы не предоставляете десериализатору какую-либо информацию о том, какой объект создать.

Когда Spring является производителем, он добавляет информацию о типе в заголовки записей, которые могут использоваться десериализатором для создания правильного типа.

Если информация о типе отсутствует, вы получите Map.

Производитель должен установить заголовки типов для этого работа.

Когда @KaflaListener находится на уровне метода, мы можем определить тип для создания из параметров метода. На уровне класса у нас есть ловушка-22 - мы не можем выбрать метод, если запись уже не была преобразована.

Ваш продюсер не должен знать фактический тип, но он по крайней мере имеет чтобы обеспечить заголовок, который можно использовать для поиска типа, который нам нужно преобразовать.

См. Типы отображения .

В качестве альтернативы, сериализатор производителя JSON должен быть настроен на добавление информации о типе в сам JSON.

Другой вариант - настраиваемый десериализатор, который «подсматривает» за JSON, чтобы определить, какой класс создать.

...