Потребитель Kafka получает сообщение в другом формате, когда производитель отправляет в виде списка <Object> - PullRequest
0 голосов
/ 07 декабря 2018
I am sending `List<Object>` from Kafka Producer to a topic. A consumer which listens to the topic receives the message but in a different format(i.e. `List<LinkedHashMap>` instead of `List<Object>`)

Any idea on how to receive the List<Object> messages in the consumer?I am sending `List<foo>`. But in the Consumer, data comes as `List<LinkedHashMap>`      

Просто изменив приведенный ниже пример в виде списка https://memorynotfound.com/spring-kafka-json-serializer-deserializer-example/ По ссылке вы можете найти Отправителя и Слушателя.Единственное изменение - Отправитель возьмет Объект вместо Foo.

public class SpringKafkaApplication implements CommandLineRunner {
        public static void main(String[] args) {
                SpringApplication.run(SpringKafkaApplication.class, args);
            }

            @Autowired
            private FooSender sender;

            @Override
            public void run(String... strings) throws Exception {
            List<Foo> f = new ArrayList<>();

        f.add(new Foo("Spring Kafka", "sending and receiving JSON messages"))
        f.add(new Foo("Spring Kafka1", "sending and receiving11"))
                sender.send(f);
            }
        }

1 Ответ

0 голосов
/ 07 декабря 2018

Не ставьте код в комментариях;трудно читать - отредактируйте вопрос.

Какую версию spring-kafka вы используете?

Вам нужен как минимум 2.1.x и брокер kafka> = 0.11, чтобы информация о типе могла передаваться в заголовках. См. Здесь .

В более ранних версиях необходимо настроить десериализатор с целевым типом.

РЕДАКТИРОВАТЬ

Iувидеть проблему;Сериализатор не знает тип содержимого контейнера (списка) из-за стирания типа, и он не пытается проанализировать коллекцию, чтобы найти тип.

Хотя мы, вероятно, можем это исправить для простых случаев, таких как коллекции,более сложные объекты (например, карты карт) были бы проблемой.

Возможно, сработал бы массив, а не список.

В то же время, вместо этого вы можете использовать конвертер сообщений,где мы используем тип параметра слушателя, чтобы определить ожидаемый тип.Вот пример:

@SpringBootApplication
public class So53665459Application2 {

    public static void main(String[] args) {
        SpringApplication.run(So53665459Application2.class, args);
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("so53665459", 1, (short) 1);
    }

    @Bean
    public MessageConverter converter() {
        return new StringJsonMessageConverter();
    }

    @KafkaListener(id = "so53665459", topics = "so53665459")
    public void listen(List<Foo1> foos) {
        System.out.println(foos);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, List<Object>> template) {
        return args -> template.send("so53665459", Collections.singletonList(new Foo1("baz1")));
    }

    public static class Foo1 {

        private String bar;

        public Foo1() {
            super();
        }

        Foo1(String bar) {
            this.bar = bar;
        }

        public String getBar() {
            return this.bar;
        }

        public void setBar(String bar) {
            this.bar = bar;
        }

        @Override
        public String toString() {
            return "Foo1 [bar=" + this.bar + "]";
        }

    }

}

и

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
#spring.kafka.consumer.value-deserializer= boot's default StringDeserializer
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest

и

[Foo1 [bar=baz1]]

Кроме того, вы можете создать подкласс сериализатора для установки заголовков с соответствующими типами, чтобы помочь десериализатору.

...