Я пытался реализовать потребителя Kafka с использованием аутентификации Kerberos. Но в течение нескольких дней у меня возникают разные проблемы, из-за которых возникают разные исключения. Текущая ситуация такова: я реализую приложение SpringBoot, которое должно потреблять данные из Kafka Topi c в сети. Kafka Topi c заставляет потребителей / производителей входить в систему с помощью Kerberos. Реализация выполняется на Windows.
На данный момент у меня есть следующая ошибка :
Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2020-07-08 10:44:12.590 ERROR 11336 --- [ main] o.s.boot.SpringApplication : Application run failed
org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185) ~[spring-context-5.2.7.RELEASE.jar:5.2.7.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53) ~[spring-context-5.2.7.RELEASE.jar:5.2.7.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360) ~[spring-context-5.2.7.RELEASE.jar:5.2.7.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158) ~[spring-context-5.2.7.RELEASE.jar:5.2.7.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122) ~[spring-context-5.2.7.RELEASE.jar:5.2.7.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:895) ~[spring-context-5.2.7.RELEASE.jar:5.2.7.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:554) ~[spring-context-5.2.7.RELEASE.jar:5.2.7.RELEASE]
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:143) ~[spring-boot-2.3.1.RELEASE.jar:2.3.1.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:758) [spring-boot-2.3.1.RELEASE.jar:2.3.1.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:750) [spring-boot-2.3.1.RELEASE.jar:2.3.1.RELEASE]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) [spring-boot-2.3.1.RELEASE.jar:2.3.1.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:315) [spring-boot-2.3.1.RELEASE.jar:2.3.1.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1237) [spring-boot-2.3.1.RELEASE.jar:2.3.1.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) [spring-boot-2.3.1.RELEASE.jar:2.3.1.RELEASE]
at com.kafka.kerberos.auth.AuthApplication.main(AuthApplication.java:10) [classes/:na]
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:825) ~[kafka-clients-2.5.0.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:631) ~[kafka-clients-2.5.0.jar:na]
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createRawConsumer(DefaultKafkaConsumerFactory.java:340) ~[spring-kafka-2.5.3.RELEASE.jar:2.5.3.RELEASE]
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:308) ~[spring-kafka-2.5.3.RELEASE.jar:2.5.3.RELEASE]
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumerWithAdjustedProperties(DefaultKafkaConsumerFactory.java:293) ~[spring-kafka-2.5.3.RELEASE.jar:2.5.3.RELEASE]
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:267) ~[spring-kafka-2.5.3.RELEASE.jar:2.5.3.RELEASE]
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:241) ~[spring-kafka-2.5.3.RELEASE.jar:2.5.3.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:595) ~[spring-kafka-2.5.3.RELEASE.jar:2.5.3.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:295) ~[spring-kafka-2.5.3.RELEASE.jar:2.5.3.RELEASE]
at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:338) ~[spring-kafka-2.5.3.RELEASE.jar:2.5.3.RELEASE]
at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:204) ~[spring-kafka-2.5.3.RELEASE.jar:2.5.3.RELEASE]
at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:338) ~[spring-kafka-2.5.3.RELEASE.jar:2.5.3.RELEASE]
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:312) ~[spring-kafka-2.5.3.RELEASE.jar:2.5.3.RELEASE]
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:257) ~[spring-kafka-2.5.3.RELEASE.jar:2.5.3.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182) ~[spring-context-5.2.7.RELEASE.jar:5.2.7.RELEASE]
... 14 common frames omitted
Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: Unable to obtain password from user
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:172) ~[kafka-clients-2.5.0.jar:na]
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:157) ~[kafka-clients-2.5.0.jar:na]
at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:73) ~[kafka-clients-2.5.0.jar:na]
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105) ~[kafka-clients-2.5.0.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:743) ~[kafka-clients-2.5.0.jar:na]
... 28 common frames omitted
Caused by: javax.security.auth.login.LoginException: Unable to obtain password from user
at com.sun.security.auth.module.Krb5LoginModule.promptForPass(Unknown Source) ~[na:1.8.0_251]
at com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Unknown Source) ~[na:1.8.0_251]
at com.sun.security.auth.module.Krb5LoginModule.login(Unknown Source) ~[na:1.8.0_251]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_251]
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[na:1.8.0_251]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[na:1.8.0_251]
at java.lang.reflect.Method.invoke(Unknown Source) ~[na:1.8.0_251]
at javax.security.auth.login.LoginContext.invoke(Unknown Source) ~[na:1.8.0_251]
at javax.security.auth.login.LoginContext.access$000(Unknown Source) ~[na:1.8.0_251]
at javax.security.auth.login.LoginContext$4.run(Unknown Source) ~[na:1.8.0_251]
at javax.security.auth.login.LoginContext$4.run(Unknown Source) ~[na:1.8.0_251]
at java.security.AccessController.doPrivileged(Native Method) ~[na:1.8.0_251]
at javax.security.auth.login.LoginContext.invokePriv(Unknown Source) ~[na:1.8.0_251]
at javax.security.auth.login.LoginContext.login(Unknown Source) ~[na:1.8.0_251]
at org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:60) ~[kafka-clients-2.5.0.jar:na]
at org.apache.kafka.common.security.kerberos.KerberosLogin.login(KerberosLogin.java:103) ~[kafka-clients-2.5.0.jar:na]
at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:62) ~[kafka-clients-2.5.0.jar:na]
at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:112) ~[kafka-clients-2.5.0.jar:na]
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:158) ~[kafka-clients-2.5.0.jar:na]
... 32 common frames omitted
, которую я указал, внутри папки ресурсов приложения springboot мой файл keytab, файл конфигурации для kerberos krb5.conf и сертификат JKS : эти файлы были предоставлены администратором Kafka Topi c Я пытаюсь получить доступ к
Мой krb5.conf файл:
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
doNotPrompt=true
keyTab="my-keytab.keytab"
kerberos_realm = "myRealm@kerberos"
principal="myPrincipla@kerberos"
ssl.endpoint.identification.algorithm=""
network.auth.use-sspi=false;
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
useTicketCache=false
principal="myPrincipla@kerberos"
doNotPrompt=true
keyTab="my-keytab.keytab"
kerberos_realm = "myRealm@kerberos"
debug=true
ssl.endpoint.identification.algorithm=""
network.auth.use-sspi=false;
};
В качестве конфигурации для моего потребителя я использую следующий код:
package com.kafka.kerberos.auth;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.core.io.ClassPathResource;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
@Configuration
@PropertySource("classpath:kafka.properties")
public class ConsumerConfiguration {
@Value("${kafka.consumer.enable.topic.auto-creation}")
private Boolean autoCreationTopic;
@Value("${kafka.consumer.bootstrap-servers}")
private String bootstrapServers;
@Value("${kafka.consumer.auto.offset.reset}")
private String consumerAutoOffsetReset;
@Value("${kafka.consumer.group.id}")
public String consumerGroupId;
@Value("${kafka.consumer.client.id}")
public String consumerClientId;
@Value("${kafka.consumer.enable.auto.commit}")
public Boolean consumerAutoCommitFlag;
@Value("${kafka.consumer.commit.sync}")
public Boolean commitSync;
// Kerberos
@Value("${kafka.consumer.sasl.mechanism}")
private String saslMechanism;
@Value("${kafka.consumer.security.protocol}")
private String securityProtocol;
@Value("${kafka.service.name}")
private String kerberosServiceName;
@Bean
public Map<String, Object> consumerConfigs() throws ClassNotFoundException, IOException {
ClassPathResource theFile = new ClassPathResource("krb5.conf");
ClassPathResource theFileJKS = new ClassPathResource("truststore.jks");
System.setProperty("java.security.auth.login.config", theFile.getURL().getPath());
Map<String, Object> consumerProperties = new HashMap<>();
consumerProperties.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, autoCreationTopic);
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, consumerAutoOffsetReset);
consumerProperties.put(ConsumerConfig.CLIENT_ID_CONFIG, consumerClientId);
consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, consumerAutoCommitFlag);
consumerProperties.put("security.protocol", securityProtocol); /* here I use "SASL_SSL" */
consumerProperties.put("sasl.mechanism", saslMechanism); /* here I use "GSSAPI" */
consumerProperties.put("sasl.kerberos.service.name", kerberosServiceName); /* here I use "kafka" */
consumerProperties.put("ssl.truststore.location", theFileJKS.getURL().getPath().substring(1));
consumerProperties.put("ssl.truststore.password","my-password-for-jks");
return consumerProperties;
}
@Bean
public ConsumerFactory<String, Object> consumerFactory() throws ClassNotFoundException, IOException {
try {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
} catch (IOException e) {}
return null;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() throws ClassNotFoundException, IOException {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
а для потребительской реализации у меня есть следующий код:
package com.kafka.kerberos.auth;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Repository;
@Repository
public class KafkaConsumerImpl implements KafkaConsumer {
@Override
@KafkaListener(id = "group-consumer",
topics = "topic-name",
containerFactory = "kafkaListenerContainerFactory")
public void receiveFromKafka(ConsumerRecord<String, Object> datum) {
...
}
}
Я не могу понять, что я сделал не так, а что пропустил. Так что любая помощь будет отличной, потому что я не знаком с аутентификацией Kerberos.
Большое спасибо за вашу поддержку.