Не ставьте код в комментариях;трудно читать - отредактируйте вопрос.
Какую версию spring-kafka вы используете?
Вам нужен как минимум 2.1.x и брокер kafka> = 0.11, чтобы информация о типе могла передаваться в заголовках. См. Здесь .
В более ранних версиях необходимо настроить десериализатор с целевым типом.
РЕДАКТИРОВАТЬ
Iувидеть проблему;Сериализатор не знает тип содержимого контейнера (списка) из-за стирания типа, и он не пытается проанализировать коллекцию, чтобы найти тип.
Хотя мы, вероятно, можем это исправить для простых случаев, таких как коллекции,более сложные объекты (например, карты карт) были бы проблемой.
Возможно, сработал бы массив, а не список.
В то же время, вместо этого вы можете использовать конвертер сообщений,где мы используем тип параметра слушателя, чтобы определить ожидаемый тип.Вот пример:
@SpringBootApplication
public class So53665459Application2 {
public static void main(String[] args) {
SpringApplication.run(So53665459Application2.class, args);
}
@Bean
public NewTopic topic() {
return new NewTopic("so53665459", 1, (short) 1);
}
@Bean
public MessageConverter converter() {
return new StringJsonMessageConverter();
}
@KafkaListener(id = "so53665459", topics = "so53665459")
public void listen(List<Foo1> foos) {
System.out.println(foos);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, List<Object>> template) {
return args -> template.send("so53665459", Collections.singletonList(new Foo1("baz1")));
}
public static class Foo1 {
private String bar;
public Foo1() {
super();
}
Foo1(String bar) {
this.bar = bar;
}
public String getBar() {
return this.bar;
}
public void setBar(String bar) {
this.bar = bar;
}
@Override
public String toString() {
return "Foo1 [bar=" + this.bar + "]";
}
}
}
и
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
#spring.kafka.consumer.value-deserializer= boot's default StringDeserializer
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
и
[Foo1 [bar=baz1]]
Кроме того, вы можете создать подкласс сериализатора для установки заголовков с соответствующими типами, чтобы помочь десериализатору.