Kafka java потребительское рукопожатие SSL Ошибка: java .security.cert.CertificateException: нет альтернативных имен субъектов - PullRequest
1 голос
/ 28 марта 2020

Я запускаю kafka 2.13-2.4.1 и настраиваю SSL-соединение между клиентом kafka (потребителем), записанным в java, и кластером kafka (3 узла, каждый из которых имеет одного брокера). Я использовал официальную документацию через Confluent's Documentation , которая имеет одностороннюю аутентификацию (без сертификата для клиента), и она не работала, поэтому я вынужден использовать два способа аутентификации, а затем и консоли потребителя и производителя нормально общались через SSL, но когда я использую мое java потребительское приложение:

package kafkaconsumerssl;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;

public class KafkaConsumerSSLTest {

    public static void main(String[] args) throws KafkaException {
        Properties props = new Properties();
                props.put("security.protocol", "SSL");
                props.put("ssl.endpoint.identification.algorithm=", "");
                props.put("ssl.truststore.location","/var/private/ssl/kafka.client.truststore.jks");
                props.put("ssl.truststore.password","*******");
                props.put("ssl.keystore.location", "/var/private/ssl/kafka.client.keystore.jks");
                props.put("ssl.keystore.password", "********"); 
                props.put("ssl.key.password", "*******");
                props.put("acks", "all");
                props.put("retries", "0");

                props.setProperty("zk.connnect", "172.31.32.219:2181,172.31.41.226:2181,172.31.33.133:2181");
                props.setProperty("group.id", "ConsumersTest");
                props.setProperty("auto.offset.reset","earliest");
                props.setProperty("enable.auto.commit", "true");
                    props.setProperty("auto.commit.interval.ms", "1000");
                props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
                    props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
                props.setProperty("bootstrap.servers","172.31.41.226:9093,172.31.33.133:9093,172.31.32.219:9093");

                KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
                consumer.subscribe(Arrays.asList("testKafka"));
                ConsumerRecords<String,String> messages = consumer.poll(Duration.ofMillis(4000));
                System.out.printf("reading_records...\n");
                for (ConsumerRecord<String, String> record : messages) {
                            System.out.printf("offset= %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                }
                consumer.close();
    }}

и мой файл конфигурации server.properties брокера kafka:

ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
ssl.truststore.password=*******
ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
ssl.keystore.password=******
ssl.key.password=******
listeners=PLAINTEXT://172.31.41.226:9092,SSL://172.31.41.226:9093
advertised.listeners=SSL://172.31.41.226:9093,PLAINTEXT://172.31.41.226:9092
ssl.client.auth=required
#security.inter.broker.protocol=SSL
ssl.endpoint.identification.algorithm=

при запуске java -cp .:/opt/kafka/libs/* -Djavax.net.debug=ssl:handshake KafkaConsumerSSLTest2.java Я получаю следующую ошибку:

javax.net.ssl|ERROR|01|main|2020-03-27 22:55:31.527 UTC|TransportContext.java:312|Fatal (CERTIFICATE_UNKNOWN): No subject alternative names present (
"throwable" : {
  java.security.cert.CertificateException: No subject alternative names present
        at java.base/sun.security.util.HostnameChecker.matchIP(HostnameChecker.java:137)
        at java.base/sun.security.util.HostnameChecker.match(HostnameChecker.java:96)
        at java.base/sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:455)
        at java.base/sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:429)
        at java.base/sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:283)
        at java.base/sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:141)
        at java.base/sun.security.ssl.CertificateMessage$T12CertificateConsumer.checkServerCerts(CertificateMessage.java:623)
        at java.base/sun.security.ssl.CertificateMessage$T12CertificateConsumer.onCertificate(CertificateMessage.java:464)
        at java.base/sun.security.ssl.CertificateMessage$T12CertificateConsumer.consume(CertificateMessage.java:360)
        at java.base/sun.security.ssl.SSLHandshake.consume(SSLHandshake.java:392)
        at java.base/sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:443)
        at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1061)
        at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1048)
        at java.base/java.security.AccessController.doPrivileged(AccessController.java:770)
        at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask.run(SSLEngineImpl.java:995)
        at org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:402)
        at org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:484)
        at org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:340)
        at org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:265)
        at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:170)
        at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:547)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:400)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:471)
        at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
        at kafkaconsumerssl.KafkaConsumerSSLTest.main(KafkaConsumerSSLTest2.java:40)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at jdk.compiler/com.sun.tools.javac.launcher.Main.execute(Main.java:404)
        at jdk.compiler/com.sun.tools.javac.launcher.Main.run(Main.java:179)
        at jdk.compiler/com.sun.tools.javac.launcher.Main.main(Main.java:119)}

)
javax.net.ssl|WARNING|01|main|2020-03-27 22:55:31.528 UTC|SSLEngineOutputRecord.java:168|outbound has closed, ignore outbound application data
[2020-03-27 22:55:31,529] ERROR [Consumer clientId=consumer-ConsumersTest-1, groupId=ConsumersTest] Connection to node 2147483645 (/172.31.32.219:9093) failed authentication due to: SSL handshake failed (org.apache.kafka.clients.NetworkClient:745)
Exception in thread "main" java.lang.IllegalArgumentException: 0 > -7
        at java.base/java.util.Arrays.copyOfRange(Arrays.java:4021)
        at java.base/java.util.Arrays.copyOfRange(Arrays.java:3981)
        at jdk.compiler/com.sun.tools.javac.launcher.Main.execute(Main.java:416)
        at jdk.compiler/com.sun.tools.javac.launcher.Main.run(Main.java:179)
        at jdk.compiler/com.sun.tools.javac.launcher.Main.main(Main.java:119)

1 Ответ

0 голосов
/ 28 марта 2020

Вы должны установить ssl.endpoint.identification.algorithm в пустую строку (пропустить =):

props.put("ssl.endpoint.identification.algorithm", "");

Обратите внимание, что если вышеуказанное решает проблему, это означает, что сертификат не соответствует имя хоста машины, которую вы используете для запуска потребителя.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...