У меня есть простой автономный разъем S3. Вот соответствующая часть свойств рабочей конфигурации:
plugin.path = <plugins directory>
bootstrap.servers = <List of servers on Amazon MKS>
security.protocol = SSL
...
Он отлично работает, когда я подключаю его к локально работающей Kafka. Однако, когда я подключаю его к брокеру Kafka по AWS (с SSL), он ничего не потребляет. Нет ошибок, ничего. Как будто topi c был пуст:
[2020-01-30 10:50:03,597] INFO Started S3 connector task with assigned partitions: [] (io.confluent.connect.s3.S3SinkTask:116)
[2020-01-30 10:50:03,598] INFO WorkerSinkTask{id=xxx} Sink task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:302)
Когда я включил режим DEBUG в connect-log4j.properties, я начал видеть множество сообщений об ошибках:
Completed connection to node -2. Fetching API versions. (org.apache.kafka.clients.NetworkClient:914)
Initiating API versions fetch from node -2. (org.apache.kafka.clients.NetworkClient:928)
Connection with YYY disconnected (org.apache.kafka.common.network.Selector:607)
java.io.EOFException
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:119)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
...
Node -2 disconnected. (org.apache.kafka.clients.NetworkClient:894)
Initialize connection to node XXX (id: -3 rack: null) for sending metadata request (org.apache.kafka.clients.NetworkClient:1125)
Initiating connection to node XXX (id: -3 rack: null) using address XXX (org.apache.kafka.clients.NetworkClient:956)
Я что-то не хватает в конфигурации SSL? Обратите внимание, что созданные вручную org.apache.kafka.clients.consumer.KafkaConsumer
s могут успешно считывать данные из этой топи c, задав только «security.protocol = SSL».
EDIT : Вот свойства соединителя:
name = my-connector
connector.class = io.confluent.connect.s3.S3SinkConnector
topics = some_topic
timestamp.extractor = Record
locale = de_DE
timezone = UTC
storage.class = io.confluent.connect.s3.storage.S3Storage
partitioner.class = io.confluent.connect.storage.partitioner.HourlyPartitioner
format.class = io.confluent.connect.s3.format.bytearray.ByteArrayFormat
s3.bucket.name = some-s3-bucket
s3.compression.type = gzip
flush.size = 3
s3.region = eu-central-1