Вам нужна запятая между адресами сервера, а не точка с запятой.
РЕДАКТИРОВАТЬ
Я только что провел тест без проблем:
spring.kafka.bootstrap-servers=localhost:9092,localhost:9093,localhost:9094
и
@SpringBootApplication
public class So50804678Application {
public static void main(String[] args) {
SpringApplication.run(So50804678Application.class, args);
}
@KafkaListener(id = "foo", topics = "so50804678")
public void in(String in) {
System.out.println(in);
}
@Bean
public NewTopic topic() {
return new NewTopic("so50804678", 1, (short) 3);
}
}
и
$ kafka-topics --zookeeper localhost:2181 --describe --topic so50804678
Topic:so50804678 PartitionCount:1 ReplicationFactor:3 Configs:
Topic: so50804678 Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
убили лидера, а
$ kafka-topics --zookeeper localhost:2181 --describe --topic so50804678
Topic:so50804678 PartitionCount:1 ReplicationFactor:3 Configs:
Topic: so50804678 Partition: 0 Leader: 1 Replicas: 0,1,2 Isr: 1,2
и
$ kafka-console-producer --broker-list localhost:9092,localhost:9093,localhost:9093 --topic so50804678
отправили сообщение ионо было получено приложением;в журнале нет ошибок, кроме ПРЕДУПРЕЖДЕНИЯ:
[Consumer clientId = consumer-1, groupId = foo] Не удалось установить соединение с узлом 0.Брокер может быть недоступен.
Затем я перезапустил мертвый сервер;остановил мое приложение;затем добавил этот код ...
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
while(true) {
System.out.println(template.send("so50804678", "foo").get().getRecordMetadata());
Thread.sleep(3_000);
}
};
}
Снова, убийство текущего лидера не имело никакого эффекта;все восстановилось нормально.
Возможно, вам придется настроить свойства listeners / advertised.listeners в реквизитах вашего сервера.Поскольку все мои брокеры находятся на локальном хосте, я оставил их по умолчанию.