настроить приемник kafka connect для упругого поиска 7.1 с помощью docker compose - PullRequest
0 голосов
/ 16 октября 2019

Я настраиваю производителя, который отправляет сообщения в виде (значения ключа) [key - это сгенерированный уникальный string, value - это json полезная нагрузка] темам kafka (v1.0.0), которыетянется соединением kafka (v5.3.1), которое затем отправляется в контейнер поиска Elastic (v 7.1).

Соединение kafka настроено на поиск в ES индекса с именем темы (индексуже сопоставлен со схемой ES) и использует kafka key в качестве уникального идентификатора (_id) для каждого документа, вставленного в индекс. Как только производитель помещает контент в тему kafka, он должен быть извлечен с помощью connect и отправлен в ES.

Для соединения kafka (5.3.1) значение, отправленное ему из темы kafka, должно иметь форму, показанную ниже, чтобы сопоставить его с индексом упругого поиска

{
"schema": {es_schema },
"payload":{ es_payload }
}

Myпродюсер может только отправлять

{
es_payload
}

Я использую контейнеры docker / docker-compose для имитации этого локально

У меня есть производитель, который может отправлять на kafka и его забирает kafkaподключиться, но не удается отправить в эластичное состояние, заявив, что схема не найдена в полезной нагрузке

Моя конфигурация для kafka подключить приемник

curl -X POST \
  http://localhost:8083/connectors/ \
  -H 'Content-Type: application/json' \
  -d '{
  "name": "elasticsearch-sink",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "1",
    "topics": "adn-kafka",
    "key.ignore": "false",
    "schema.ignore": "false",
    "connection.url": "http://elasticsearch:9200",
    "type.name": "",
    "name": "elasticsearch-sink",
    "value.converter.schemas.enable": "false",
    "key.converter.schemas.enable":"false"
  }
}'

Я получаю ошибку

Caused by: org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
     at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:338)
     at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$0(WorkerSinkTask.java:510)
     at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
     at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
     ... 13 more

, если яустановите schema.ignore : true, он не ищет индекс со схемой, и я не думаю, что это правильный путь, потому что мой индекс уже сопоставлен, и я не хочу, чтобы kafka connect отправлял для создания нового индекса

мой докерсоставьте

version: '3'
services:
  zookeeper:
    container_name : zookeeper
    image: zookeeper
    ports:
     - 2181:2181
     - 2888:2888
     - 3888:3888

  kafka:
    container_name : kafka
    image: bitnami/kafka:1.0.0-r5
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_BROKER_ID: "42"
      KAFKA_ADVERTISED_HOST_NAME: "kafka"
      ALLOW_PLAINTEXT_LISTENER: "yes" 

  elasticsearch:
    container_name : elasticsearch
    image:
      elasticsearch:7.1.1
    ulimits:
      memlock:
        soft: -1
        hard: -1
      nofile:
        soft: 65536
        hard: 65536
    environment:
      - cluster.name=docker-cluster
      - node.name=node1
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms4g -Xmx4g"
      - discovery.type=single-node

    ports:
      - "9400:9200"
      - "9500:9300"
    deploy:
      resources:
        limits:
          memory: 6G
        reservations:
          memory: 6G
  kibana:
    container_name : kibana
    image: docker.elastic.co/kibana/kibana:7.1.1
    # environment:
      # - SERVER_NAME=Local kibana
      # - SERVER_HOST=0.0.0.0
      # - ELASTICSEARCH_URL=elasticsearch:9400
    ports:
      - "5601:5601"
    depends_on:
      - elasticsearch

  kafka-connect:
    container_name : kafka-connect
    image: confluentinc/cp-kafka-connect:5.3.1
    ports:
      - 8083:8083
    depends_on:
      - zookeeper
      - kafka
    volumes:
      - $PWD/connect-plugins:/connect-plugins
    environment:
      CONNECT_BOOTSTRAP_SERVERS: "kafka:9092"
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: kafka-connect
      CONNECT_CONFIG_STORAGE_TOPIC: docker-kafka-connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: docker-kafka-connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: docker-kafka-connect-status
      CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
      CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_KEY_CONVERTER-SCHEMAS_ENABLE: "false"
      CONNECT_VALUE_CONVERTER-SCHEMAS_ENABLE: "false"
      CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
      CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
      CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_PLUGIN_PATH: '/usr/share/java'
      # Interceptor config
      CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.3.1.jar

имя темы kafka: test-kafka

Es индекс: test-kafka

ES mapping

{
    "mappings":{
        "properties" :{
            "ppid":{
                "type":"long"
            },
            "field1":{
                "type":"long"
            },
            "field2":{
                "type":"long"
            },
            "time1":{
                "type":"date",
                "format":"yyyy-MM-dd HH:mm:ss"
            },
            "time2":{
                "type":"date",
                "format":"yyyy-MM-dd HH:mm:ss"
            },
            "status":{
                "type":"keyword"
            },
            "field3":{
                "type":"integer"
            },
            "field4":{
                "type":"integer"
            }
        }
    }
}

полезная нагрузка отправляется наКафка тема

{ "ppid" : 1, "field1":2 , "field2":1,"time1":"2019-09-25 07:36:48", "time2":"2019-09-25 07:36:48", "status":"SUCCESS", "field3":30,"field4":16}
...