SpringBoot - Kafka - Kerberos - Не удалось создать потребителя Kafka с файлом keytab Kerberos - PullRequest
0 голосов
/ 08 июля 2020

Я пытался реализовать потребителя Kafka с использованием аутентификации Kerberos. Но в течение нескольких дней у меня возникают разные проблемы, из-за которых возникают разные исключения. Текущая ситуация такова: я реализую приложение SpringBoot, которое должно потреблять данные из Kafka Topi c в сети. Kafka Topi c заставляет потребителей / производителей входить в систему с помощью Kerberos. Реализация выполняется на Windows.

На данный момент у меня есть следующая ошибка :

Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2020-07-08 10:44:12.590 ERROR 11336 --- [           main] o.s.boot.SpringApplication               : Application run failed

org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185) ~[spring-context-5.2.7.RELEASE.jar:5.2.7.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53) ~[spring-context-5.2.7.RELEASE.jar:5.2.7.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360) ~[spring-context-5.2.7.RELEASE.jar:5.2.7.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158) ~[spring-context-5.2.7.RELEASE.jar:5.2.7.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122) ~[spring-context-5.2.7.RELEASE.jar:5.2.7.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:895) ~[spring-context-5.2.7.RELEASE.jar:5.2.7.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:554) ~[spring-context-5.2.7.RELEASE.jar:5.2.7.RELEASE]
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:143) ~[spring-boot-2.3.1.RELEASE.jar:2.3.1.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:758) [spring-boot-2.3.1.RELEASE.jar:2.3.1.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:750) [spring-boot-2.3.1.RELEASE.jar:2.3.1.RELEASE]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) [spring-boot-2.3.1.RELEASE.jar:2.3.1.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:315) [spring-boot-2.3.1.RELEASE.jar:2.3.1.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1237) [spring-boot-2.3.1.RELEASE.jar:2.3.1.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) [spring-boot-2.3.1.RELEASE.jar:2.3.1.RELEASE]
    at com.kafka.kerberos.auth.AuthApplication.main(AuthApplication.java:10) [classes/:na]
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:825) ~[kafka-clients-2.5.0.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:631) ~[kafka-clients-2.5.0.jar:na]
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createRawConsumer(DefaultKafkaConsumerFactory.java:340) ~[spring-kafka-2.5.3.RELEASE.jar:2.5.3.RELEASE]
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:308) ~[spring-kafka-2.5.3.RELEASE.jar:2.5.3.RELEASE]
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumerWithAdjustedProperties(DefaultKafkaConsumerFactory.java:293) ~[spring-kafka-2.5.3.RELEASE.jar:2.5.3.RELEASE]
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:267) ~[spring-kafka-2.5.3.RELEASE.jar:2.5.3.RELEASE]
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:241) ~[spring-kafka-2.5.3.RELEASE.jar:2.5.3.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:595) ~[spring-kafka-2.5.3.RELEASE.jar:2.5.3.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:295) ~[spring-kafka-2.5.3.RELEASE.jar:2.5.3.RELEASE]
    at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:338) ~[spring-kafka-2.5.3.RELEASE.jar:2.5.3.RELEASE]
    at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:204) ~[spring-kafka-2.5.3.RELEASE.jar:2.5.3.RELEASE]
    at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:338) ~[spring-kafka-2.5.3.RELEASE.jar:2.5.3.RELEASE]
    at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:312) ~[spring-kafka-2.5.3.RELEASE.jar:2.5.3.RELEASE]
    at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:257) ~[spring-kafka-2.5.3.RELEASE.jar:2.5.3.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182) ~[spring-context-5.2.7.RELEASE.jar:5.2.7.RELEASE]
    ... 14 common frames omitted
Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: Unable to obtain password from user

    at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:172) ~[kafka-clients-2.5.0.jar:na]
    at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:157) ~[kafka-clients-2.5.0.jar:na]
    at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:73) ~[kafka-clients-2.5.0.jar:na]
    at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105) ~[kafka-clients-2.5.0.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:743) ~[kafka-clients-2.5.0.jar:na]
    ... 28 common frames omitted
Caused by: javax.security.auth.login.LoginException: Unable to obtain password from user

    at com.sun.security.auth.module.Krb5LoginModule.promptForPass(Unknown Source) ~[na:1.8.0_251]
    at com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Unknown Source) ~[na:1.8.0_251]
    at com.sun.security.auth.module.Krb5LoginModule.login(Unknown Source) ~[na:1.8.0_251]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_251]
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[na:1.8.0_251]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[na:1.8.0_251]
    at java.lang.reflect.Method.invoke(Unknown Source) ~[na:1.8.0_251]
    at javax.security.auth.login.LoginContext.invoke(Unknown Source) ~[na:1.8.0_251]
    at javax.security.auth.login.LoginContext.access$000(Unknown Source) ~[na:1.8.0_251]
    at javax.security.auth.login.LoginContext$4.run(Unknown Source) ~[na:1.8.0_251]
    at javax.security.auth.login.LoginContext$4.run(Unknown Source) ~[na:1.8.0_251]
    at java.security.AccessController.doPrivileged(Native Method) ~[na:1.8.0_251]
    at javax.security.auth.login.LoginContext.invokePriv(Unknown Source) ~[na:1.8.0_251]
    at javax.security.auth.login.LoginContext.login(Unknown Source) ~[na:1.8.0_251]
    at org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:60) ~[kafka-clients-2.5.0.jar:na]
    at org.apache.kafka.common.security.kerberos.KerberosLogin.login(KerberosLogin.java:103) ~[kafka-clients-2.5.0.jar:na]
    at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:62) ~[kafka-clients-2.5.0.jar:na]
    at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:112) ~[kafka-clients-2.5.0.jar:na]
    at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:158) ~[kafka-clients-2.5.0.jar:na]
    ... 32 common frames omitted

, которую я указал, внутри папки ресурсов приложения springboot мой файл keytab, файл конфигурации для kerberos krb5.conf и сертификат JKS : эти файлы были предоставлены администратором Kafka Topi c Я пытаюсь получить доступ к

Мой krb5.conf файл:

KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  storeKey=true
  doNotPrompt=true
  keyTab="my-keytab.keytab"
  kerberos_realm = "myRealm@kerberos"
  principal="myPrincipla@kerberos"
  ssl.endpoint.identification.algorithm=""
  network.auth.use-sspi=false;
};
Client {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  useTicketCache=false
  principal="myPrincipla@kerberos"
  doNotPrompt=true
  keyTab="my-keytab.keytab"
  kerberos_realm = "myRealm@kerberos"
  debug=true
  ssl.endpoint.identification.algorithm=""
  network.auth.use-sspi=false;
};

В качестве конфигурации для моего потребителя я использую следующий код:

package com.kafka.kerberos.auth;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.core.io.ClassPathResource;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

@Configuration
@PropertySource("classpath:kafka.properties")
public class ConsumerConfiguration {
    
    @Value("${kafka.consumer.enable.topic.auto-creation}")
    private Boolean autoCreationTopic;
    @Value("${kafka.consumer.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${kafka.consumer.auto.offset.reset}")
    private String consumerAutoOffsetReset;
    @Value("${kafka.consumer.group.id}")
    public String consumerGroupId;
    @Value("${kafka.consumer.client.id}")
    public String consumerClientId;
    @Value("${kafka.consumer.enable.auto.commit}")
    public Boolean consumerAutoCommitFlag;
    @Value("${kafka.consumer.commit.sync}")
    public Boolean commitSync;
    
    // Kerberos
    @Value("${kafka.consumer.sasl.mechanism}")
    private String saslMechanism;
    @Value("${kafka.consumer.security.protocol}")
    private String securityProtocol;
    @Value("${kafka.service.name}")
    private String kerberosServiceName;
    
    @Bean
    public Map<String, Object> consumerConfigs() throws ClassNotFoundException, IOException {
        
        ClassPathResource theFile = new ClassPathResource("krb5.conf");
        ClassPathResource theFileJKS = new ClassPathResource("truststore.jks");
        System.setProperty("java.security.auth.login.config", theFile.getURL().getPath());
        
        Map<String, Object> consumerProperties = new HashMap<>();
        
        consumerProperties.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, autoCreationTopic);
        
        consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
        
        consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        
        consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, consumerAutoOffsetReset);
        consumerProperties.put(ConsumerConfig.CLIENT_ID_CONFIG, consumerClientId);
        consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, consumerAutoCommitFlag);

        consumerProperties.put("security.protocol", securityProtocol); /* here I use "SASL_SSL" */
        consumerProperties.put("sasl.mechanism", saslMechanism); /* here I use "GSSAPI" */
        consumerProperties.put("sasl.kerberos.service.name", kerberosServiceName); /* here I use "kafka" */
        
        consumerProperties.put("ssl.truststore.location", theFileJKS.getURL().getPath().substring(1));
        consumerProperties.put("ssl.truststore.password","my-password-for-jks"); 

        return consumerProperties;
    }
    
    @Bean
    public ConsumerFactory<String, Object> consumerFactory() throws ClassNotFoundException, IOException {
        try {
            return new DefaultKafkaConsumerFactory<>(consumerConfigs());
        } catch (IOException e) {}
        return null;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() throws ClassNotFoundException, IOException {

        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }
    
}

а для потребительской реализации у меня есть следующий код:

package com.kafka.kerberos.auth;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Repository;

@Repository
public class KafkaConsumerImpl implements KafkaConsumer {
        
    @Override
    @KafkaListener(id = "group-consumer", 
                    topics = "topic-name",
                    containerFactory = "kafkaListenerContainerFactory")
    public void receiveFromKafka(ConsumerRecord<String, Object> datum) {
        ...
    }
}

Я не могу понять, что я сделал не так, а что пропустил. Так что любая помощь будет отличной, потому что я не знаком с аутентификацией Kerberos.

Большое спасибо за вашу поддержку.

...