Kafka Connect AWS S3 коннектор мойки не читает из темы - PullRequest
1 голос
/ 30 января 2020

У меня есть простой автономный разъем S3. Вот соответствующая часть свойств рабочей конфигурации:

plugin.path = <plugins directory>
bootstrap.servers = <List of servers on Amazon MKS>
security.protocol = SSL
...

Он отлично работает, когда я подключаю его к локально работающей Kafka. Однако, когда я подключаю его к брокеру Kafka по AWS (с SSL), он ничего не потребляет. Нет ошибок, ничего. Как будто topi c был пуст:

[2020-01-30 10:50:03,597] INFO Started S3 connector task with assigned partitions: [] (io.confluent.connect.s3.S3SinkTask:116)
[2020-01-30 10:50:03,598] INFO WorkerSinkTask{id=xxx} Sink task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:302)

Когда я включил режим DEBUG в connect-log4j.properties, я начал видеть множество сообщений об ошибках:

Completed connection to node -2. Fetching API versions. (org.apache.kafka.clients.NetworkClient:914)
Initiating API versions fetch from node -2. (org.apache.kafka.clients.NetworkClient:928)
Connection with YYY disconnected (org.apache.kafka.common.network.Selector:607)
java.io.EOFException
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:119)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
...
Node -2 disconnected. (org.apache.kafka.clients.NetworkClient:894)
Initialize connection to node XXX (id: -3 rack: null) for sending metadata request (org.apache.kafka.clients.NetworkClient:1125)
Initiating connection to node XXX (id: -3 rack: null) using address XXX (org.apache.kafka.clients.NetworkClient:956)

Я что-то не хватает в конфигурации SSL? Обратите внимание, что созданные вручную org.apache.kafka.clients.consumer.KafkaConsumer s могут успешно считывать данные из этой топи c, задав только «security.protocol = SSL».

EDIT : Вот свойства соединителя:

name = my-connector
connector.class = io.confluent.connect.s3.S3SinkConnector
topics = some_topic
timestamp.extractor = Record
locale = de_DE
timezone = UTC
storage.class = io.confluent.connect.s3.storage.S3Storage
partitioner.class = io.confluent.connect.storage.partitioner.HourlyPartitioner
format.class = io.confluent.connect.s3.format.bytearray.ByteArrayFormat
s3.bucket.name = some-s3-bucket
s3.compression.type = gzip
flush.size = 3
s3.region = eu-central-1

1 Ответ

1 голос
/ 30 января 2020

У меня была похожая проблема, которая была решена после того, как я дополнительно указал протокол безопасности для потребителя (помимо глобального): просто добавьте

consumer.security.protocol = SSL

В свойствах конфигурации

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...