Я пытаюсь настроить kafka-connect для отправки моих данных с kafka на s3. Я новичок ie в аспекте kafka, и я пытаюсь реализовать этот поток без каких-либо ssl-шифрований, просто чтобы освоить его.
kafka version : 2.12-2.2.0
kafka-connect : 4.1.1 (https://api.hub.confluent.io/api/plugins/confluentinc/kafka-connect-s3/versions/4.1.1/archive)
На сервере kafka s
. свойства file the only change that I did is setting the
advertised.listeners` к моему ec2 ip:
advertised.listeners=PLAINTEXT://ip:9092
kafka-connect свойства:
# Kafka broker IP addresses to connect to
bootstrap.servers=localhost:9092
# Path to directory containing the connector jar and dependencies
plugin.path=/root/kafka_2.12-2.2.0/plugins/
# Converters to use to convert keys and values
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
# The internal converters Kafka Connect uses for storing offset and configuration data
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
security.protocol=SASL_PLAINTEXT
consumer.security.protocol=SASL_PLAINTEXT
мой s3-sink.properties
файл:
name=s3.sink
connector.class=io.confluent.connect.s3.S3SinkConnector
tasks.max=1
topics=my_topic
s3.region=us-east-1
s3.bucket.name=my_bucket
s3.part.size=5242880
flush.size=3
storage.class=io.confluent.connect.s3.storage.S3Storage
format.class=io.confluent.connect.s3.format.json.JsonFormat
schema.generator.class=io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator
partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
schema.compatibility=NONE
Я запускаю kafka-connect с помощью следующей команды:
connect-standalone.sh kafka-connect.properties s3-sink.properties
Сначала я получил следующую ошибку:
Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set
Из других сообщений я увидел, что мне нужно создать Конфигурационный файл jaas, так что я сделал:
cat config/kafka_server_jass.conf
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="userName"
serviceName="kafka"
password="password";
};
и:
export KAFKA_OPTS="-Djava.security.auth.login.config=/root/kafka_2.12-2.2.0/config/kafka_server_jass.conf"
Теперь я получаю следующую ошибку:
Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure SaslClientAuthenticator
Caused by: org.apache.kafka.common.KafkaException: Principal could not be determined from Subject, this may be a transient failure due to Kerberos re-login
help :)