Как получить состояние соединения Kafka без разбора файла журнала?Чтобы можно было выполнить действие «Соединение с узлом прервано во время аутентификации». - PullRequest
0 голосов
/ 04 июня 2018

При аутентификации с неверными учетными данными во время аутентификации я получаю следующее ожидаемое сообщение:

[Consumer clientId = consumer-1, groupId = xxx] Соединение с узлом -1 прервано во время аутентификации.Это может указывать на то, что аутентификация завершилась неудачно из-за неверных учетных данных.

Это все хорошо, но сейчас я могу найти эту информацию только в файле журнала, в то время как я хотел бы принять меры по этому сообщению.

Я также посмотрел исходный код Kafka, откуда исходит это сообщение:

(в 1.1) https://github.com/apache/kafka/blob/1.1/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java (или в транке) https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java)

Вот такой код:

switch (disconnectState.state()) {
            case AUTHENTICATION_FAILED:
                connectionStates.authenticationFailed(nodeId, now, disconnectState.exception());
                log.error("Connection to node {} failed authentication due to: {}", nodeId, disconnectState.exception().getMessage());
                break;
            case AUTHENTICATE:
                // This warning applies to older brokers which dont provide feedback on authentication failures
                log.warn("Connection to node {} terminated during authentication. This may indicate " +
                        "that authentication failed due to invalid credentials.", nodeId);
                break;

Важное примечание : я переопределяю механизм аутентификации в Kafka с помощью пользовательского модуля, основанного на SASL_PLAIN, и пока я только отлаживал, я кое-что понялв этом модуле во время аутентификации произошел сбой, что вызвало исключение, отличное от этого модуля, что означало, что он вызвал этот путь к коду «более старого брокера».Но все же я хочу предпринять некоторые действия в этом состоянии.

Я попытался создать прослушиватель, например:

    final ContainerProperties containerProperties = new ContainerProperties("TEST_TOPIC");
    containerProperties.setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
    containerProperties.setMessageListener((AcknowledgingMessageListener<GenericRecord, GenericRecord>) (record, ack) -> logger.info("Got record on test topic"));
    KafkaMessageListenerContainer<GenericRecord, GenericRecord> container = new KafkaMessageListenerContainer<>(kafkaConsumerFactory, containerProperties);
    container.start();

Но код не генерирует никаких исключений аутентификации, этопросто продолжает печатать вышеприведенное сообщение в журнале.

Я также пытался создать простого потребителя, например:

    Consumer<GenericRecord, GenericRecord> consumer = kafkaConsumerFactory.createConsumer();
    consumer.subscribe(Collections.singleton("BX_TEST_TOPIC"));
    try {
        ConsumerRecords<GenericRecord, GenericRecord> poll = consumer.poll(10);
    }
    catch (Exception e) {
        logger.info("Exception during polling", e);
    }

Но этот код также не выдает никаких исключений, он также простопродолжает печатать сообщение в журнале.Даже если в соответствии с https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll-long- метод poll может вызвать исключение AuthenticationException.Вероятно, это связано с тем, что сообщение «более старого брокера» в исходном коде показывает только предупреждение в журнале.

Итак, как на самом деле перехватить любое исключение AuthenticationException или получить состояние соединения каким-либо значимым способомвместо анализа журнала, если сервер не возвращает исключение аутентификации, но все еще не может войти?

Обратите внимание, точно такая же проблема возникает, когда аутентификация не является проблемой, но что-то еще не удается.Например, иногда мой кластер Kafka не запускается правильно, и в этом случае я получаю следующее сообщение:

Невозможно установить соединение с узлом -1.Брокер может быть недоступен.

, который я тоже не могу проверить.Вместо этого журнал просто заполняется этим сообщением, когда клиент пытается подключиться к бесконечному циклу.

1 Ответ

0 голосов
/ 04 июня 2018

Вы можете уменьшить цикл за счет увеличения отката повторных попыток.

Вы можете проверить состояние посредника с помощью чего-то вроде ...

spring.kafka.producer.properties.retry.backoff.ms=1000
spring.kafka.producer.properties.max.block.ms=10000
spring.kafka.bootstrap-servers=localhost:9096

и

@Bean
public ApplicationRunner runner(ProducerFactory<?, ?> producerFactory) {
    return args -> {
        try (Producer<?, ?> producer = producerFactory.createProducer()) {
            producer.partitionsFor("foo");
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    };
}

и

org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 10000 ms.

Я уменьшил max.block.ms, потому что по умолчанию это 60 секунд.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...