Рукопожатие Kafka SSL не удалось в пользовательском Java производитель - PullRequest
0 голосов
/ 04 августа 2020

Пытаюсь создать некоторые данные с помощью моего приложения-производителя Kafka, но я получаю сообщение об ошибке ниже:

[SocketServer brokerId = 0] Неудачная аутентификация с localhost / 127.0.0.1 (ошибка подтверждения SSL) (org . apache .kafka.common.network.Selector)

Я использую протокол SASL_SSL с механизмом PLAIN для связи с Kafka. Когда я использую kafka-console-производителя sh kafka-console-producer.sh --broker-list localhost:9093 --topic kafka-topic --producer.config ../config/producer.properties

и kafka-console-consumer sh kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic kafka-topic --consumer.config ../config/consumer.properties

, все работает нормально. Вот часть моих server.properties :

listeners=PLAINTEXT://localhost:9092,SASL_SSL://localhost:9093
advertised.listeners=PLAINTEXT://localhost:9092,SASL_SSL://localhost:9093

listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
   username="admin" \
   password="admin-secret" \
   user_admin="admin-secret";

sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
security.inter.broker.protocol=SASL_SSL
ssl.endpoint.identification.algorithm=

authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

allow.everyone.if.no.acl.found=true

ssl.keystore.location=/mnt/data/kafka/config/keystore/kafka.keystore.jks
ssl.keystore.password=password
ssl.key.password=password
ssl.truststore.location=/mnt/data/kafka/config/truststore/kafka.truststore.jks
ssl.truststore.password=password

ssl.client.auth=required
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.keystore.type=JKS
ssl.truststore.type=JKS
ssl.secure.random.implementation=SHA1PRNG

Producer.properties

bootstrap.servers=localhost:9093
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
   username="admin" \
   password="admin-secret" \
   user_admin="admin-secret";
ssl.truststore.location=/mnt/data/kafka/config/truststore/kafka.truststore.jks
ssl.truststore.password=password

consumer.properties

bootstrap.servers=localhost:9093
group.id=test-consumer-group
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
   username="admin" \
   password="admin-secret" \
   user_admin="admin-secret";

ssl.truststore.location=/mnt/data/kafka/config/truststore/kafka.truststore.jks
ssl.truststore.password=password

А вот и мое Java приложение производителя Kafka

private KafkaProducer<String, String> producer;
    private String address;
    private final int BATCH_SIZE = 16384 * 4;

    private Properties setProperties() {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, address);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, BATCH_SIZE);
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 200);
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
        properties.put("acks", "all");
        properties.put("sasl.mechanism", "PLAIN");
        properties.put("security.protocol", "SASL_SSL");
        properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\" user_admin=\"admin-secret\";");
        properties.put("ssl.truststore.location", "/mnt/data/kafka/config/truststore/kafka.truststore.jks");
        properties.put("ssl.truststore.password", "password");
        return properties;
    }

    public void createTopicWithPartitions(String topicName, int partitionsCount) throws ExecutionException, InterruptedException {
        Properties properties = new Properties();
        properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, address);
        AdminClient adminClient = AdminClient.create(properties);

        boolean isTopicExists = adminClient.listTopics().names().get().stream()
                .anyMatch(name -> name.equals(topicName));

        if (isTopicExists) {
            System.out.println("Topic already exists");
        } else {
            NewTopic newTopic = new NewTopic(topicName, partitionsCount, (short) 1);
            adminClient.createTopics(Collections.singleton(newTopic)).all().get();
        }

        adminClient.close();
    }

    public void sendMessages(String topicName, String payload, int messagesCount) {
        for (int i = 0; i < messagesCount; i++) {
            String partitionKey = DataUtils.generateSourceDeviceId(15).toUpperCase();
            producer.send(new ProducerRecord<>(topicName, partitionKey, payload));
        }
    }

    public KafkaMessagesProducer(String address) {
        this.address = address;
        this.producer = new KafkaProducer<>(setProperties());
    }

    public int getBATCH_SIZE() {
        return BATCH_SIZE;
    }

Как я описал ранее, производитель / потребитель консоли работает нормально, мое приложение Java получает ошибку установления связи SSL, но с выключенным протоколом SASL_SSL мое приложение Java работает нормально.

UPD : инструмент для создания сертификатов, используемый с этого веб-сайта: https://github.com/confluentinc/confluent-platform-security-tools/blob/master/kafka-generate-ssl.sh

1 Ответ

0 голосов
/ 05 августа 2020

У меня возникла проблема с методом createTopicWithPartitions. Я переписал свойства, созданные в методе setProperties (), с помощью (AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, address)

...