Я столкнулся с проблемой при попытке репликации из одного сливного облачного кластера в другой. Я слежу за объединенной документацией о том, как это сделать, однако у меня постоянно возникает сбой, который, как я полагаю, вызван ошибкой в моей конфигурации.
Конфигурация выглядит следующим образом:
{
"name": "kafka_topics_replication",
"config": {
"name": "kafka_topics_replication",
"connector.class": "io.confluent.connect.replicator.ReplicatorSourceConnector",
"topic.whitelist": "topics",
"src.kafka.bootstrap.servers": "source-broker:9092",
"src.kafka.security.protocol": "SASL_SSL",
"src.kafka.security.mechanism": "PLAIN",
"src.kafka.client.id": "src-to-dst-replicator",
"src.kafka.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"src-username\" password=\"src-password\" serviceName=\"Kafka\";",
"confluent.topic.replication.factor": "3",
"dest.topic.replication.factor": "3",
"dest.kafka.bootstrap.servers": "dest-broker.cloud:9092",
"dest.kafka.security.protocol": "SASL_SSL",
"dest.kafka.sasl.mechanism": "PLAIN",
"dest.kafka.client.id": "src-to-dst-replicator",
"dest.kafka.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"dst-username\" password=\"dst-password\" serviceName=\"Kafka\";"
},
"tasks": [],
"type": "source"
}
Соединитель запускается, но продолжает регистрировать следующую ошибку:
[2020-07-14 14:45:15,568] WARN [kafka_topics_replication|worker] [AdminClient clientId=src-to-dst-replicator] Error connecting to node source-broker:9092 (id: -1 rack: null) (org.apache.kafka.clients.NetworkClient:969)
java.io.IOException: Channel could not be created for socket java.nio.channels.SocketChannel[closed]
at org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:348)
at org.apache.kafka.common.network.Selector.registerChannel(Selector.java:329)
at org.apache.kafka.common.network.Selector.connect(Selector.java:256)
at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:964)
at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:294)
at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.sendEligibleCalls(KafkaAdminClient.java:1018)
at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1260)
at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1203)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure SaslClientAuthenticator
at org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(SaslChannelBuilder.java:228)
at org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:338)
... 8 more
Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure SaslClientAuthenticator
Caused by: org.apache.kafka.common.KafkaException: Principal could not be determined from Subject, this may be a transient failure due to Kerberos re-login
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.firstPrincipal(SaslClientAuthenticator.java:616)
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.<init>(SaslClientAuthenticator.java:200)
at org.apache.kafka.common.network.SaslChannelBuilder.buildClientAuthenticator(SaslChannelBuilder.java:274)
at org.apache.kafka.common.network.SaslChannelBuilder.lambda$buildChannel$1(SaslChannelBuilder.java:216)
at org.apache.kafka.common.network.KafkaChannel.<init>(KafkaChannel.java:142)
at org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(SaslChannelBuilder.java:224)
at org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:338)
at org.apache.kafka.common.network.Selector.registerChannel(Selector.java:329)
at org.apache.kafka.common.network.Selector.connect(Selector.java:256)
at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:964)
at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:294)
at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.sendEligibleCalls(KafkaAdminClient.java:1018)
at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1260)
at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1203)
at java.lang.Thread.run(Thread.java:748)
Поиск в Google на самом деле не обнаружил ничего, что могло бы быть применимо к этому сценарию. Скрестив пальцы, чтобы кто-то здесь мог хотя бы указать мне в правильном направлении.
Спасибо, -Райан