Как настроить Confluent Kafka Connect для локального приемника S3 - PullRequest
0 голосов
/ 17 марта 2020

У меня есть следующий docker -компонент:

version: '3.3'

services:

  zookeeper:
    image: confluentinc/cp-zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_NUM_PARTITIONS: 12
      KAFKA_COMPRESSION_TYPE: gzip
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: 'false'
      KAFKA_JMX_PORT: 9091
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
    depends_on:
      - zookeeper
    ports:
      - 9092:9092

  connect:
    image: confluentinc/cp-kafka-connect
    depends_on:
      - kafka
    environment:
      CONNECT_BOOTSTRAP_SERVERS: kafka:9092
      CONNECT_REST_PORT: 8083
      CONNECT_REST_LISTENERS: http://0.0.0.0:8083
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_CONFIG_STORAGE_TOPIC: __connect-config
      CONNECT_OFFSET_STORAGE_TOPIC: __connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: __connect-status
      CONNECT_GROUP_ID: "kafka-connect"
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_PLUGIN_PATH: ' /usr/share/java/'
      AWS_ACCESS_KEY_ID:  "1234"
      AWS_SECRET_ACCESS_KEY: "1234"
    ports:
      - 8083:8083

  s3:
    image: vladnev/fake-s3
    environment:
      AWS_ACCESS_KEY_ID:  "1234"
      AWS_SECRET_ACCESS_KEY: "1234"

и конфигурация для создания разъема S3:

{
  "name": "s3-sink",
  "config": {

    "_comment": "The S3 sink connector class",
    "connector.class":"io.confluent.connect.s3.S3SinkConnector",

    "_comment": "The total number of Connect tasks to spawn (with implicit upper limit the number of topic-partitions)",
    "tasks.max":"1",

    "_comment": "Which topics to export to S3",
    "topics":"__connect-config",

    "_comment": "The S3 bucket that will be used by this connector instance",
    "s3.bucket.name":"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",

    "_comment": "The AWS region where the S3 bucket is located",
    "s3.region":"us-west-2",

    "_comment": "The size in bytes of a single part in a multipart upload. The last part is of s3.part.size bytes or less. This property does not affect the total size of an S3 object uploaded by the S3 connector",
    "s3.part.size":"5242880",

    "_comment": "The maximum number of Kafka records contained in a single S3 object. Here a high value to allow for time-based partition to take precedence",
    "flush.size":"100000",

    "_comment": "Kafka Connect converter used to deserialize keys (unused in this example)",
    "key.converter":"org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable":"false",

    "_comment": "Kafka Connect converter used to deserialize values",
    "value.converter":"org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable":"false",

    "_comment": "The type of storage for this storage cloud connector",
    "storage.class":"io.confluent.connect.s3.storage.S3Storage",

    "_comment": "The storage format of the objects uploaded to S3",
    "format.class":"io.confluent.connect.s3.format.json.JsonFormat",

    "_comment": "Schema compatibility mode between records with schemas (Useful when used with schema-based converters. Unused in this example, listed for completeness)",
    "schema.compatibility":"NONE",

    "_comment": "The class used to partition records in objects to S3. Here, partitioning based on time is used.",
    "partitioner.class":"io.confluent.connect.storage.partitioner.TimeBasedPartitioner",

    "_comment": "The locale used by the time-based partitioner to encode the date string",
    "locale":"en",

    "_comment": "Setting the timezone of the timestamps is also required by the time-based partitioner",
    "timezone":"UTC",

    "_comment": "The date-based part of the S3 object key",
    "path.format":"'date'=YYYY-MM-dd/'hour'=HH",

    "_comment": "The duration that aligns with the path format defined above",
    "partition.duration.ms":"3600000",

    "_comment": "The interval between timestamps that is sufficient to upload a new object to S3. Here a small interval of 1min for better visualization during the demo",
    "rotate.interval.ms":"60000",

    "_comment": "The class to use to derive the timestamp for each record. Here Kafka record timestamps are used",
    "timestamp.extractor":"Record"
  }
}

Я не смог проверить эту настройку с реальным ведром S3 еще, так что, возможно, это не совсем правильно, и мне, вероятно, понадобится настроить что-то еще, если я хочу проверить его на AWS. Но сейчас, только локально, что я должен изменить, чтобы он отправлял сообщения в поддельный контейнер s3? Я думаю, мне нужно изменить свойства s3.bucket.name и s3.region, но я не знаю как. Возможно, это невозможно?

Большое вам спасибо за помощь! :)

...