У меня есть следующий docker -компонент:
version: '3.3'
services:
zookeeper:
image: confluentinc/cp-zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_NUM_PARTITIONS: 12
KAFKA_COMPRESSION_TYPE: gzip
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: 'false'
KAFKA_JMX_PORT: 9091
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
depends_on:
- zookeeper
ports:
- 9092:9092
connect:
image: confluentinc/cp-kafka-connect
depends_on:
- kafka
environment:
CONNECT_BOOTSTRAP_SERVERS: kafka:9092
CONNECT_REST_PORT: 8083
CONNECT_REST_LISTENERS: http://0.0.0.0:8083
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_CONFIG_STORAGE_TOPIC: __connect-config
CONNECT_OFFSET_STORAGE_TOPIC: __connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: __connect-status
CONNECT_GROUP_ID: "kafka-connect"
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_PLUGIN_PATH: ' /usr/share/java/'
AWS_ACCESS_KEY_ID: "1234"
AWS_SECRET_ACCESS_KEY: "1234"
ports:
- 8083:8083
s3:
image: vladnev/fake-s3
environment:
AWS_ACCESS_KEY_ID: "1234"
AWS_SECRET_ACCESS_KEY: "1234"
и конфигурация для создания разъема S3:
{
"name": "s3-sink",
"config": {
"_comment": "The S3 sink connector class",
"connector.class":"io.confluent.connect.s3.S3SinkConnector",
"_comment": "The total number of Connect tasks to spawn (with implicit upper limit the number of topic-partitions)",
"tasks.max":"1",
"_comment": "Which topics to export to S3",
"topics":"__connect-config",
"_comment": "The S3 bucket that will be used by this connector instance",
"s3.bucket.name":"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
"_comment": "The AWS region where the S3 bucket is located",
"s3.region":"us-west-2",
"_comment": "The size in bytes of a single part in a multipart upload. The last part is of s3.part.size bytes or less. This property does not affect the total size of an S3 object uploaded by the S3 connector",
"s3.part.size":"5242880",
"_comment": "The maximum number of Kafka records contained in a single S3 object. Here a high value to allow for time-based partition to take precedence",
"flush.size":"100000",
"_comment": "Kafka Connect converter used to deserialize keys (unused in this example)",
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable":"false",
"_comment": "Kafka Connect converter used to deserialize values",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable":"false",
"_comment": "The type of storage for this storage cloud connector",
"storage.class":"io.confluent.connect.s3.storage.S3Storage",
"_comment": "The storage format of the objects uploaded to S3",
"format.class":"io.confluent.connect.s3.format.json.JsonFormat",
"_comment": "Schema compatibility mode between records with schemas (Useful when used with schema-based converters. Unused in this example, listed for completeness)",
"schema.compatibility":"NONE",
"_comment": "The class used to partition records in objects to S3. Here, partitioning based on time is used.",
"partitioner.class":"io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"_comment": "The locale used by the time-based partitioner to encode the date string",
"locale":"en",
"_comment": "Setting the timezone of the timestamps is also required by the time-based partitioner",
"timezone":"UTC",
"_comment": "The date-based part of the S3 object key",
"path.format":"'date'=YYYY-MM-dd/'hour'=HH",
"_comment": "The duration that aligns with the path format defined above",
"partition.duration.ms":"3600000",
"_comment": "The interval between timestamps that is sufficient to upload a new object to S3. Here a small interval of 1min for better visualization during the demo",
"rotate.interval.ms":"60000",
"_comment": "The class to use to derive the timestamp for each record. Here Kafka record timestamps are used",
"timestamp.extractor":"Record"
}
}
Я не смог проверить эту настройку с реальным ведром S3 еще, так что, возможно, это не совсем правильно, и мне, вероятно, понадобится настроить что-то еще, если я хочу проверить его на AWS. Но сейчас, только локально, что я должен изменить, чтобы он отправлял сообщения в поддельный контейнер s3? Я думаю, мне нужно изменить свойства s3.bucket.name
и s3.region
, но я не знаю как. Возможно, это невозможно?
Большое вам спасибо за помощь! :)