Как десериализовать список json с объектом в моем потребителе кафки? - PullRequest
2 голосов
/ 25 апреля 2020

My Kafka Producer отправляет список объектов в формате Json.

Я пытаюсь выяснить, как заставить моего потребителя десериализовать список. Я могу получить один объект и прочитать его, но когда я изменяю код на тип List, я получаю следующую ошибку:

Error:(32, 47) java: incompatible types: cannot infer type arguments for org.springframework.kafka.core.DefaultKafkaConsumerFactory<>
    reason: inference variable V has incompatible equality constraints java.util.List<nl.domain.X>,nl.domain.X

EDIT

Эта ошибка была решена путем добавления TypeReference к JsonDeserilizer.

Текущая проблема :

При использовании сообщения оно не относится к типу, который я определил (то есть List ), но возвращающий LinkedHashMap

Это Consumer конфигурация:

@EnableKafka
@Configuration
public class KafkaConfiguration {

    @Bean
    public ConsumerFactory<String, List<X>> xConsumerFactory() {
        Map<String, Object> config = new HashMap<>();

        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_json");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(),
                new JsonDeserializer<>(new TypeReference<List<X>>() {}));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, List<X>> xKafkaListenerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, List<X>> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(xConsumerFactory());
        return factory;
    }
}

Это Потребитель :

@Service
public class KafkaConsumer {
    @KafkaListener(topics = "test", groupId = "group_json", containerFactory = "xKafkaListenerFactory")
    public void consume(List<X> x) {
        // In reality it is consuming LinkedHashMap which isn't what i want
        x.forEach(i ->
                System.out.println("Consumed message: " + i.getName()));
    }
}

Это Производитель конфигурация:

@Configuration
public class KafkaConfiguration {
    @Bean
    public ProducerFactory<String, List<X>> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public KafkaTemplate<String, List<X>> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

Это производитель :

@Component
public class KafkaProducer {

    private final static String TOPIC = "test";
    private final KafkaTemplate<String, List<X>> kafkaTemplate;

    public KafkaProducer(KafkaTemplate<String, List<X>> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    @Override
    public void publishMessage(List<X> x) {
        // Created 3 instances of X for current example
        List<X> list = new ArrayList();
        list.add(new X("Apple"));
        list.add(new X("Beach"));
        list.add(new X("Money"));
        ListenableFuture<SendResult<String, List<X>>> listenableFuture = kafkaTemplate.send(TOPIC, list);
    }
}

Резюме :

Кажется, что продюсер работает нормально. Я получаю следующую ошибку в моем потребителе при отправке такого сообщения. Он не может привести LinkedHashMap в список.

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void nl.infrastructure.input.message.kafka.consumer.KafkaConsumer.consume(java.util.List<nl.domain.X>)' threw exception; nested exception is java.lang.ClassCastException: class java.util.LinkedHashMap cannot be cast to class nl.domain.X (java.util.LinkedHashMap is in module java.base of loader 'bootstrap'; nl.domain.X is in unnamed module of loader 'app'); nested exception is java.lang.ClassCastException: class java.util.LinkedHashMap cannot be cast to class nl.domain.X (java.util.LinkedHashMap is in module java.base of loader 'bootstrap'; nl.domain.X is in unnamed module of loader 'app')

Ответы [ 2 ]

2 голосов
/ 28 апреля 2020

Это может быть связано с неправильным типом импорта JsonDeserializer

Использовать import org.springframework.kafka.support.serializer.JsonDeserializer;

Вы должны настроить JsonDeserializer, как показано ниже:

protected Deserializer<List<X>> kafkaDeserializer() {
    ObjectMapper om = new ObjectMapper();
    om.getTypeFactory().constructParametricType(List.class, X.class);
    return new JsonDeserializer<>(om);
}
1 голос
/ 06 мая 2020

Использование ответа @AmanGarg. Немного подправил. (Не знаю, почему это не сработало для меня.) Преобразование одного Class в POJO работало, но в List<X> не было.
Добавление его, только если кто-то сталкивается с этой проблемой.

protected JsonDeserializer<List<X>> kafkaDeserializer() {
    ObjectMapper om = new ObjectMapper();
    JavaType type = om.getTypeFactory().constructParametricType(List.class, X.class);
    return new JsonDeserializer<List<X>>(type, om, false);
}

Используется другой конструктор JsonDeserializer.

...