При аутентификации с неверными учетными данными во время аутентификации я получаю следующее ожидаемое сообщение:
[Consumer clientId = consumer-1, groupId = xxx] Соединение с узлом -1 прервано во время аутентификации.Это может указывать на то, что аутентификация завершилась неудачно из-за неверных учетных данных.
Это все хорошо, но сейчас я могу найти эту информацию только в файле журнала, в то время как я хотел бы принять меры по этому сообщению.
Я также посмотрел исходный код Kafka, откуда исходит это сообщение:
(в 1.1) https://github.com/apache/kafka/blob/1.1/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java (или в транке) https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java)
Вот такой код:
switch (disconnectState.state()) {
case AUTHENTICATION_FAILED:
connectionStates.authenticationFailed(nodeId, now, disconnectState.exception());
log.error("Connection to node {} failed authentication due to: {}", nodeId, disconnectState.exception().getMessage());
break;
case AUTHENTICATE:
// This warning applies to older brokers which dont provide feedback on authentication failures
log.warn("Connection to node {} terminated during authentication. This may indicate " +
"that authentication failed due to invalid credentials.", nodeId);
break;
Важное примечание : я переопределяю механизм аутентификации в Kafka с помощью пользовательского модуля, основанного на SASL_PLAIN, и пока я только отлаживал, я кое-что понялв этом модуле во время аутентификации произошел сбой, что вызвало исключение, отличное от этого модуля, что означало, что он вызвал этот путь к коду «более старого брокера».Но все же я хочу предпринять некоторые действия в этом состоянии.
Я попытался создать прослушиватель, например:
final ContainerProperties containerProperties = new ContainerProperties("TEST_TOPIC");
containerProperties.setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
containerProperties.setMessageListener((AcknowledgingMessageListener<GenericRecord, GenericRecord>) (record, ack) -> logger.info("Got record on test topic"));
KafkaMessageListenerContainer<GenericRecord, GenericRecord> container = new KafkaMessageListenerContainer<>(kafkaConsumerFactory, containerProperties);
container.start();
Но код не генерирует никаких исключений аутентификации, этопросто продолжает печатать вышеприведенное сообщение в журнале.
Я также пытался создать простого потребителя, например:
Consumer<GenericRecord, GenericRecord> consumer = kafkaConsumerFactory.createConsumer();
consumer.subscribe(Collections.singleton("BX_TEST_TOPIC"));
try {
ConsumerRecords<GenericRecord, GenericRecord> poll = consumer.poll(10);
}
catch (Exception e) {
logger.info("Exception during polling", e);
}
Но этот код также не выдает никаких исключений, он также простопродолжает печатать сообщение в журнале.Даже если в соответствии с https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll-long- метод poll может вызвать исключение AuthenticationException.Вероятно, это связано с тем, что сообщение «более старого брокера» в исходном коде показывает только предупреждение в журнале.
Итак, как на самом деле перехватить любое исключение AuthenticationException или получить состояние соединения каким-либо значимым способомвместо анализа журнала, если сервер не возвращает исключение аутентификации, но все еще не может войти?
Обратите внимание, точно такая же проблема возникает, когда аутентификация не является проблемой, но что-то еще не удается.Например, иногда мой кластер Kafka не запускается правильно, и в этом случае я получаю следующее сообщение:
Невозможно установить соединение с узлом -1.Брокер может быть недоступен.
, который я тоже не могу проверить.Вместо этого журнал просто заполняется этим сообщением, когда клиент пытается подключиться к бесконечному циклу.