Весенняя кафка и кластер кафки - PullRequest
0 голосов
/ 11 июня 2018

Я настроил 3 kafka в кластере и пытаюсь использовать с spring-kafka.

Но после того, как я убил лидера kafka, я не смог отправить другие сообщения в очередь.

Я устанавливаю свойство spring.kafka.bootstrap-servers как: "kafka-1: 9092; kafka-2: 9093, kafka-3: 9094" и все имена в моем файле hosts.

Кафка версия 0.10

Кто-нибудь знает, как правильно настроить?

Редактировать

Я проверил одну вещь и произошло странное поведение.При запуске сервиса я отправляю сообщение в тему (для принудительного создания)

Код:

@Bean
public KafkaSyncListener synchronousListener(MessageSender sender, KafkaProperties prop) {
    sender.send(prop.getSynchronousTopic(), "Message to force create the topic! Run, Forrest, Run!");
    return new KafkaSyncListener();
}

Итак, в этот раз я не запустил сервер kafka-1 (простоостальные) и произошло исключение:

org.springframework.kafka.core.KafkaProducerException: Не удалось отправить;Вложенное исключение - org.apache.kafka.common.errors.TimeoutException: не удалось обновить метаданные после 60000 мс.

Кажется, что spring-kafka просто пытается подключиться на первом сервере начальной загрузки.Я использую spring-kafka 1.3.5.RELEASE и kafka 0.10.1.1

Edit 2

Я сделал тест, который вы сделали.То же самое происходит, когда я удаляю первый док-контейнер (kafka-1), лидер которого изменился.Итак, Мой потребитель (весенний сервис) не может потреблять сообщения.Но когда я снова запускаю kafka-1, сервис получает все сообщения Мой потребитель ConcurrentKafkaListenerContainerFactory:

{
  key.deserializer=class
  org.apache.kafka.common.serialization.IntegerDeserializer,
  value.deserializer=class
  org.apache.kafka.common.serialization.StringDeserializer,
  max.poll.records=500,
  group.id=mongo-adapter-service,
  ssl.keystore.location=/certs/kafka.keystore.jks,
  bootstrap.servers=[kafka-2:9093, kafka-1:9092, kafka-3:9094],
auto.commit.interval.ms=100,
security.protocol=SSL,
max.request.size=5242880,
ssl.truststore.location=/certs/kafka.keystore.jks,
auto.offset.reset=earliest
}

1 Ответ

0 голосов
/ 11 июня 2018

Вам нужна запятая между адресами сервера, а не точка с запятой.

РЕДАКТИРОВАТЬ

Я только что провел тест без проблем:

spring.kafka.bootstrap-servers=localhost:9092,localhost:9093,localhost:9094

и

@SpringBootApplication
public class So50804678Application {

    public static void main(String[] args) {
        SpringApplication.run(So50804678Application.class, args);
    }

    @KafkaListener(id = "foo", topics = "so50804678")
    public void in(String in) {
        System.out.println(in);
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("so50804678", 1, (short) 3);
    }

}

и

$ kafka-topics --zookeeper localhost:2181 --describe --topic so50804678
Topic:so50804678    PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: so50804678   Partition: 0    Leader: 0   Replicas: 0,1,2 Isr: 0,1,2

убили лидера, а

$ kafka-topics --zookeeper localhost:2181 --describe --topic so50804678
Topic:so50804678    PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: so50804678   Partition: 0    Leader: 1   Replicas: 0,1,2 Isr: 1,2

и

$ kafka-console-producer --broker-list localhost:9092,localhost:9093,localhost:9093 --topic so50804678

отправили сообщение ионо было получено приложением;в журнале нет ошибок, кроме ПРЕДУПРЕЖДЕНИЯ:

[Consumer clientId = consumer-1, groupId = foo] Не удалось установить соединение с узлом 0.Брокер может быть недоступен.

Затем я перезапустил мертвый сервер;остановил мое приложение;затем добавил этот код ...

@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
    return args -> {
        while(true) {
            System.out.println(template.send("so50804678", "foo").get().getRecordMetadata());
            Thread.sleep(3_000);
        }
    };
}

Снова, убийство текущего лидера не имело никакого эффекта;все восстановилось нормально.

Возможно, вам придется настроить свойства listeners / advertised.listeners в реквизитах вашего сервера.Поскольку все мои брокеры находятся на локальном хосте, я оставил их по умолчанию.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...