Я пытаюсь проверить потребителя kafka, используя данные из удаленного кластера Kafka.Я получаю следующую ошибку при использовании kafka-console-consumer.sh
:
ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)
java.lang.IllegalStateException: No entry found for connection 2147475658
at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:330)
at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:134)
at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:885)
at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:276)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.tryConnect(ConsumerNetworkClient.java:548)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:655)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:635)
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:575)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:389)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:231)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:316)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1214)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1164)
at kafka.tools.ConsoleConsumer$ConsumerWrapper.receive(ConsoleConsumer.scala:436)
at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:104)
at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:76)
at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54)
at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
Processed a total of 0 messages
Вот команда, которую я использую:
./bin/kafka-console-consumer.sh --bootstrap-server SSL://{IP}:{PORT},SSL://{IP}:{PORT},SSL://{IP}:{PORT} --consumer.config ./config/consumer.properties --topic MYTOPIC --group MYGROUP
Вот файл ./config/consumer.properties
:
bootstrap.servers=SSL://{IP}:{PORT},SSL://{IP}:{PORT},SSL://{IP}:{PORT}
# consumer group id
group.id=MYGROUP
# What to do when there is no initial offset in Kafka or if the current
# offset does not exist any more on the server: latest, earliest, none
auto.offset.reset=earliest
#### Security
security.protocol=SSL
ssl.key.password=test1234
ssl.keystore.location=/opt/kafka/config/certs/keystore.jks
ssl.keystore.password=test1234
ssl.truststore.location=/opt/kafka/config/certs/truststore.jks
ssl.truststore.password=test1234
Ты хоть представляешь, в чем проблема?