Confluent Control Center не перехватывает поток - PullRequest
0 голосов
/ 14 сентября 2018

Я использую CCC с потоком Кафки, который заполняется соединителем Postgres из Debezium.

Я использую следующее docker-compose.yml:

version: '2'
services:
  zookeeper-1:
    image: confluentinc/cp-zookeeper:latest
    hostname: zookeeper-1
    container_name: zookeeper-1
    volumes:
    - /path/to/something/zk1/zk-data:/var/lib/zookeeper/data
    - /path/to/something/zk1/zk-txn-logs:/var/lib/zookeeper/log
    ports:
    - 22181:22181
    environment:
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_CLIENT_PORT: 22181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_LIMIT: 2
      ZOOKEEPER_SERVERS: zookeeper-1:22888:23888;zookeeper-2:32888:33888;zookeeper-3:42888:43888

  zookeeper-2:
    image: confluentinc/cp-zookeeper:latest
    hostname: zookeeper-2
    container_name: zookeeper-2
    volumes:
    - /path/to/something/zk2/zk-data:/var/lib/zookeeper/data
    - /path/to/something/zk2/zk-txn-logs:/var/lib/zookeeper/log
    ports:
    - 32181:32181
    environment:
      ZOOKEEPER_SERVER_ID: 2
      ZOOKEEPER_CLIENT_PORT: 32181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_LIMIT: 2
      ZOOKEEPER_SERVERS: zookeeper-1:22888:23888;zookeeper-2:32888:33888;zookeeper-3:42888:43888

  zookeeper-3:
    image: confluentinc/cp-zookeeper:latest
    hostname: zookeeper-3
    container_name: zookeeper-3
    volumes:
    - /path/to/something/zk3/zk-data:/var/lib/zookeeper/data
    - /path/to/something/zk3/zk-txn-logs:/var/lib/zookeeper/log
    ports:
    - 42181:42181
    environment:
      ZOOKEEPER_SERVER_ID: 3
      ZOOKEEPER_CLIENT_PORT: 42181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_LIMIT: 2
      ZOOKEEPER_SERVERS: zookeeper-1:22888:23888;zookeeper-2:32888:33888;zookeeper-3:42888:43888

  kafka-1:
    image: confluentinc/cp-enterprise-kafka:latest
    hostname: kafka-1
    container_name: kafka-1
    volumes:
    - /path/to/something/kafka1/kafka-data:/var/lib/kafka/data
    ports:
    - 19092:19092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:22181,zookeeper-2:32181,zookeeper-3:42181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.0.71:19092
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka-1:19092,kafka-2:19093,kafka-3:19094
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper-1:22181,zookeeper-2:32181,zookeeper-3:42181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
      KAFKA_REPLICA_FETCH_MAX_BYTES: 3145728
      KAFKA_MESSAGE_MAX_BYTES: 3145728
      KAFKA_PRODUCER_MAX_REQUEST_SIZE: 3145728
      KAFKA_CONSUMER_MAX_PARTITION_FETCH_BYTES: 3145728    
    depends_on:
    - zookeeper-1
    - zookeeper-2
    - zookeeper-3

  kafka-2:
    image: confluentinc/cp-enterprise-kafka:latest
    hostname: kafka-2
    container_name: kafka-2
    volumes:
    - /path/to/something/kafka2/kafka-data:/var/lib/kafka/data
    ports:
    - 19093:19093
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:22181,zookeeper-2:32181,zookeeper-3:42181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.0.71:19093
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka-1:19092,kafka-2:19093,kafka-3:19094
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper-1:22181,zookeeper-2:32181,zookeeper-3:42181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
      KAFKA_REPLICA_FETCH_MAX_BYTES: 3145728
      KAFKA_MESSAGE_MAX_BYTES: 3145728
      KAFKA_PRODUCER_MAX_REQUEST_SIZE: 3145728
      KAFKA_CONSUMER_MAX_PARTITION_FETCH_BYTES: 3145728
    depends_on:
    - zookeeper-1
    - zookeeper-2
    - zookeeper-3

  kafka-3:
    image: confluentinc/cp-enterprise-kafka:latest
    hostname: kafka-3
    container_name: kafka-3
    volumes:
    - /path/to/something/kafka3/kafka-data:/var/lib/kafka/data
    ports:
    - 19094:19094
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:22181,zookeeper-2:32181,zookeeper-3:42181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.0.71:19094
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka-1:19092,kafka-2:19093,kafka-3:19094
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper-1:22181,zookeeper-2:32181,zookeeper-3:42181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
      KAFKA_REPLICA_FETCH_MAX_BYTES: 3145728
      KAFKA_MESSAGE_MAX_BYTES: 3145728
      KAFKA_PRODUCER_MAX_REQUEST_SIZE: 3145728
      KAFKA_CONSUMER_MAX_PARTITION_FETCH_BYTES: 3145728
    depends_on:
    - zookeeper-1
    - zookeeper-2
    - zookeeper-3

  schema-registry:
    image: confluentinc/cp-schema-registry:latest
    hostname: schema-registry
    container_name: schema-registry
    ports:
    - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper-1:22181,zookeeper-2:32181,zookeeper-3:42181

  connect:
    image: confluentinc/cp-kafka-connect:latest
    hostname: connect
    container_name: connect
    depends_on:
      - schema-registry
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
      - kafka-1
      - kafka-2
      - kafka-3
    ports:
    - "8083:8083"
    volumes:
    - /path/to/something/postgres-source-connector:/usr/share/java/postgres-source-connector
    - /path/to/something/mongodb-sink-connector:/usr/share/java/mongodb-sink-connector
    environment:
      CONNECT_BOOTSTRAP_SERVERS: kafka-1:19092,kafka-2:19093,kafka-3:19094
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
      CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
      CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_PLUGIN_PATH: '/usr/share/java'
      CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.0.0.jar
      CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      CONNECT_PRODUCER_MAX_REQUEST_SIZE: 3145728
      CONNECT_CONSUMER_MAX_PARTITION_FETCH_BYTES: 3145728

  control-center:
    image: confluentinc/cp-enterprise-control-center:latest
    hostname: control-center
    container_name: control-center
    depends_on:
      - schema-registry
      - connect
      - ksql-server
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
      - kafka-1
      - kafka-2
      - kafka-3
    ports:
    - "9021:9021"
    environment:
      CONTROL_CENTER_BOOTSTRAP_SERVERS: kafka-1:19092,kafka-2:19093,kafka-3:19094
      CONTROL_CENTER_ZOOKEEPER_CONNECT: zookeeper-1:22181,zookeeper-2:32181,zookeeper-3:42181
      CONTROL_CENTER_CONNECT_CLUSTER: 'http://connect:8083'
      CONTTROL_CENTER_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      CONTROL_CENTER_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      CONTROL_CENTER_KSQL_URL: "http://ksql-server:8088"
      CONTROL_CENTER_CONNECT_CLUSTER: "http://connect:8083"
      CONTROL_CENTER_KSQL_ADVERTISED_URL: "http://localhost:8088"
      CONTROL_CENTER_SCHEMA_REGISTRY_URL: "https://schema-registry:8081"
      CONTROL_CENTER_REPLICATION_FACTOR: 1
      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
      CONFLUENT_METRICS_TOPIC_REPLICATION: 1
      CONTROL_CENTER_CUB_KAFKA_TIMEOUT: 300
      PORT: 9021

  ksql-server:
    image: confluentinc/cp-ksql-server:latest
    hostname: ksql-server
    container_name: ksql-server
    depends_on:
    - connect
    ports:
    - "8088:8088"
    environment:
      KSQL_CUB_KAFKA_TIMEOUT: 300
      KSQL_BOOTSTRAP_SERVERS: kafka-1:19092,kafka-2:19093,kafka-3:19094
      KSQL_LISTENERS: http://0.0.0.0:8088
      KSQL_KSQL_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      KSQL_KSQL_SERVICE_ID: confluent_rmoff_01
      KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      KSQL_KSQL_COMMIT_INTERVAL_MS: 2000
      KSQL_KSQL_CACHE_MAX_BYTES_BUFFERING: 10000000
      KSQL_KSQL_AUTO_OFFSET_RESET: earliest

  ksql-cli:
    image: confluentinc/cp-ksql-cli:latest
    hostname: ksql-cli
    container_name: ksql-cli
    depends_on:
    - connect
    - ksql-server
    entrypoint: /bin/sh
    tty: true

  rest-proxy:
    image: confluentinc/cp-kafka-rest:latest
    hostname: rest-proxy
    container_name: rest-proxy
    depends_on:
    - schema-registry
    ports:
    - 8082:8082
    environment:
      KAFKA_REST_HOST_NAME: rest-proxy
      KAFKA_REST_BOOTSTRAP_SERVERS: kafka-1:19092,kafka-2:19093,kafka-3:19094
      KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
      KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
      KAFKA_REST_ACCESS_CONTROL_ALLOW_METHODS: 'GET,POST,PUT,DELETE,OPTIONS'
      KAFKA_REST_ACCESS_CONTROL_ALLOW_ORIGIN: '*'

  postgres:
    image: debezium/postgres
    hostname: postgres
    container_name: postgres
    volumes:
      - /path/to/something/postgres:/var/lib/postgresql/data
    environment:
      POSTGRES_USER: admin
      POSTGRES_PASSWORD: admin
      POSTGRES_DB: some-db
    ports:
      - 5432:5432

I 'я сопоставил Postgres Connector с Kafka Connect (через volumes в Compose) и могу видеть его в CCC при создании нового Source Connector.

Когда я создаю Source source, я вижу сообщение журнала, указывающеечто тема для этого разъема была создана.Я также вижу эту тему в области подключения CCC.Я также вижу, что Connect может аутентифицироваться в Postgres через этот Connector.

Когда я вносю изменения в таблицу, указанную в Connector, я вижу, как Кафка (у меня кластер из 3) выясняет, ктособирается сохранить это сообщение.Это означает, что журнал Postgres tx создал сообщение соответствующей темы в ответ на мое изменение, поэтому БД, Connector и Kafka работают правильно.

Однако, независимо от того, что я делаю, я не могу передать это событиеотображать в Data Streams или System Health (ни в областях > Topics, ни > Brokers) (редактировать: теперь это работает. Потоки данных по-прежнему нет).

I 'Я в растерянности за то, что идет не так.Единственное указание, которое я получаю, - это начальное сообщение:

Двойная проверка, чтобы убедиться, что перехватчики мониторинга были правильно настроены для любых клиентов, производящих или потребляющих из кластера controlcenter.cluster

У меня сложилось впечатление, что это, по сути, означает, что мой контейнер Control Center сконфигурирован с *_INTERCEPTOR_CLASSES, который я вставил выше.Я перешел по ссылке в этом сообщении, которая ведет вас на их сайт документации, где написано, что нужно проверить ответ веб-службы, предоставляющей данные kafka.Как следует из их документации, я получаю ответ всего {}, указывающий, что Кафка говорит, что у него нет данных.Но это определенно так.

Он пытается сказать, что мне также нужно как-то настроить эти перехватчики в Соединителе?Я не знаю, что значит иметь отслеживающие перехватчики для любых потребителей / производителей - у меня нет никаких сырых потребителей / производителей Java (пока) ... пока только исходные соединители.

Мой соединительКонфигурация выглядит следующим образом (создается через пользовательский интерфейс CCC), если это имеет значение:

{
  "database.server.name": "my-namespace",
  "database.dbname": "my-database",
  "database.hostname": "my-hostname",
  "database.port": "5432",
  "database.user": "admin",
  "schema.whitelist": "public",
  "table.whitelist": "my-database.my-table",
  "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
  "name": "my-connector",
  "database.password": "its correct"
}

При запуске всех служб я вижу следующее в соответствующих журналах, которые, как я подозреваю, могут представлять интерес (ни в коем случаезаказ ниже):

control-center     | 2018-09-17T20:45:02.748463792Z     interceptor.classes = []
kafka-2            | 2018-09-17T20:44:56.293701931Z     interceptor.classes = []
schema-registry    | 2018-09-17T20:45:34.658065846Z     interceptor.classes = []
connect            | 2018-09-17T20:48:52.628218936Z [2018-09-17 20:48:52,628] WARN The configuration 'producer.interceptor.classes' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
connect            | 2018-09-17T20:48:52.628472218Z [2018-09-17 20:48:52,628] WARN The configuration 'consumer.interceptor.classes' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)

Любая помощь приветствуется.Спасибо!

1 Ответ

0 голосов
/ 14 сентября 2018

Вы ссылаетесь на 5.1.0 JAR для перехватчиков, которых нет на изображении latest. Если вы docker-compose exec connect bash и пойдете по указанному пути, вы увидите, какая версия там (в настоящее время 5.0.0 в latest). Так что измени свой состав, чтобы прочитать

CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.0.0.jar

Посмотрите на https://github.com/rmoff/ksql/blob/clickstream-c3/ksql-clickstream-demo/docker-compose.yml пример работающего Docker Compose с Confluent Control Center и перехватчиков, работающих с Kafka Connect (а также KSQL, если вам интересно).

Для дальнейшей отладки проверьте:

  1. Файл журнала Kafka Connect - если перехватчики работают, вы должны увидеть

    [2018-03-02 11:39:38,594] INFO ConsumerConfig values:
    [...]
            interceptor.classes = [io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor]
    
    [2018-03-02 11:39:38,806] INFO ProducerConfig values:
    [...]
            interceptor.classes = [io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor]
    
    [2018-03-02 11:39:39,455] INFO creating interceptor (io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor:74)
    [2018-03-02 11:39:39,456] INFO creating interceptor (io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor:70)
    [2018-03-02 11:39:39,486] INFO MonitoringInterceptorConfig values:
            confluent.monitoring.interceptor.publishMs = 15000
            confluent.monitoring.interceptor.topic = _confluent-monitoring
    (io.confluent.monitoring.clients.interceptor.MonitoringInterceptorConfig:223)
    [2018-03-02 11:39:39,486] INFO MonitoringInterceptorConfig values:
            confluent.monitoring.interceptor.publishMs = 15000
            confluent.monitoring.interceptor.topic = _confluent-monitoring
    (io.confluent.monitoring.clients.interceptor.MonitoringInterceptorConfig:223)
    
  2. Подробную информацию о control-center-console-consumer, которую можно использовать для проверки фактических полученных данных перехватчика, можно использовать в Центре управления Confluent Control Center (или нет, если что-то настроено неправильно) .

...