Не удалось подключиться и описать кластер Kafka. Апач кафка коннект - PullRequest
0 голосов
/ 22 октября 2019

Я настроил кластер MSK в aws и создал экземпляр EC2 в том же vpn.

Я попробовал kafka-console-consumer.sh и kafka-console-producer.sh, и он работает нормально. Мне удалось увидеть сообщения, отправленные производителем в качестве потребителя

1) Я загрузил разъем s3 (https://docs.confluent.io/current/connect/kafka-connect-s3/index.html)

2), распаковал файлы в / home / ec2-user /plugins /

3) Создан connect-standalone.properties со следующим содержимым

bootstrap.servers=<my brokers>
plugin.path=/home/ec2-user/kafka-plugins
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets

4) Создан s3-sink.properties со следующим содержимым.

name=s3-sink
connector.class=io.confluent.connect.s3.S3SinkConnector
tasks.max=1
topics=<My Topic>
s3.region=us-east-1
s3.bucket.name=vk-ingestion-dev
s3.part.size=5242880
flush.size=1
storage.class=io.confluent.connect.s3.storage.S3Storage
format.class=io.confluent.connect.s3.format.json.JsonFormat
schema.generator.class=io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator
partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
schema.compatibility=NONE

КогдаЯ запускаю connect-standlone.sh с двумя вышеуказанными файлами проп, он ждет некоторое время и выдает следующую ошибку:

[AdminClient clientId=adminclient-1] Metadata update failed (org.apache.kafka.clients.admin.internals.AdminMetadataManager:237)
org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
[2019-10-22 19:28:36,789] INFO [AdminClient clientId=adminclient-1] Metadata update failed (org.apache.kafka.clients.admin.internals.AdminMetadataManager:237)
org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has exited.
[2019-10-22 19:28:36,796] ERROR Stopping due to error (org.apache.kafka.connect.cli.ConnectStandalone:124)
org.apache.kafka.connect.errors.ConnectException: Failed to connect to and describe Kafka cluster. Check worker's broker connection and security properties.
    at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:64)
    at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:45)
    at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:81)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
    at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
    at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
    at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
    at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
    at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:58)
    ... 2 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.

Есть ли какие-то проблемы с безопасностью, которые мне нужно искать?

1 Ответ

1 голос
/ 23 октября 2019

После добавления следующей конфигурации ssl все заработало.

security.protocol=SSL
ssl.truststore.location=/tmp/kafka.client.truststore.jks

После добавления вышеуказанных параметров, соединитель запустился без ошибок, но данные не загружаются в s3.

Добавление производителя иКонфигурация потребителя config отдельно работает.

Пример:

producer.security.protocol=SSL
producer.ssl.truststore.location=/tmp/kafka.client.truststore.jks

consumer.security.protocol=SSL
consumer.ssl.truststore.location=/tmp/kafka.client.truststore.jks
...