Я настраиваю производителя, который отправляет сообщения в виде (значения ключа) [key
- это сгенерированный уникальный string
, value
- это json
полезная нагрузка] темам kafka (v1.0.0), которыетянется соединением kafka (v5.3.1), которое затем отправляется в контейнер поиска Elastic (v 7.1).
Соединение kafka настроено на поиск в ES индекса с именем темы (индексуже сопоставлен со схемой ES) и использует kafka key
в качестве уникального идентификатора (_id) для каждого документа, вставленного в индекс. Как только производитель помещает контент в тему kafka, он должен быть извлечен с помощью connect и отправлен в ES.
Для соединения kafka (5.3.1) значение, отправленное ему из темы kafka, должно иметь форму, показанную ниже, чтобы сопоставить его с индексом упругого поиска
{
"schema": {es_schema },
"payload":{ es_payload }
}
Myпродюсер может только отправлять
{
es_payload
}
Я использую контейнеры docker / docker-compose для имитации этого локально
У меня есть производитель, который может отправлять на kafka и его забирает kafkaподключиться, но не удается отправить в эластичное состояние, заявив, что схема не найдена в полезной нагрузке
Моя конфигурация для kafka подключить приемник
curl -X POST \
http://localhost:8083/connectors/ \
-H 'Content-Type: application/json' \
-d '{
"name": "elasticsearch-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "adn-kafka",
"key.ignore": "false",
"schema.ignore": "false",
"connection.url": "http://elasticsearch:9200",
"type.name": "",
"name": "elasticsearch-sink",
"value.converter.schemas.enable": "false",
"key.converter.schemas.enable":"false"
}
}'
Я получаю ошибку
Caused by: org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:338)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$0(WorkerSinkTask.java:510)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 13 more
, если яустановите schema.ignore : true
, он не ищет индекс со схемой, и я не думаю, что это правильный путь, потому что мой индекс уже сопоставлен, и я не хочу, чтобы kafka connect отправлял для создания нового индекса
мой докерсоставьте
version: '3'
services:
zookeeper:
container_name : zookeeper
image: zookeeper
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
container_name : kafka
image: bitnami/kafka:1.0.0-r5
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_BROKER_ID: "42"
KAFKA_ADVERTISED_HOST_NAME: "kafka"
ALLOW_PLAINTEXT_LISTENER: "yes"
elasticsearch:
container_name : elasticsearch
image:
elasticsearch:7.1.1
ulimits:
memlock:
soft: -1
hard: -1
nofile:
soft: 65536
hard: 65536
environment:
- cluster.name=docker-cluster
- node.name=node1
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms4g -Xmx4g"
- discovery.type=single-node
ports:
- "9400:9200"
- "9500:9300"
deploy:
resources:
limits:
memory: 6G
reservations:
memory: 6G
kibana:
container_name : kibana
image: docker.elastic.co/kibana/kibana:7.1.1
# environment:
# - SERVER_NAME=Local kibana
# - SERVER_HOST=0.0.0.0
# - ELASTICSEARCH_URL=elasticsearch:9400
ports:
- "5601:5601"
depends_on:
- elasticsearch
kafka-connect:
container_name : kafka-connect
image: confluentinc/cp-kafka-connect:5.3.1
ports:
- 8083:8083
depends_on:
- zookeeper
- kafka
volumes:
- $PWD/connect-plugins:/connect-plugins
environment:
CONNECT_BOOTSTRAP_SERVERS: "kafka:9092"
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: kafka-connect
CONNECT_CONFIG_STORAGE_TOPIC: docker-kafka-connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: docker-kafka-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: docker-kafka-connect-status
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_KEY_CONVERTER-SCHEMAS_ENABLE: "false"
CONNECT_VALUE_CONVERTER-SCHEMAS_ENABLE: "false"
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_PLUGIN_PATH: '/usr/share/java'
# Interceptor config
CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.3.1.jar
имя темы kafka: test-kafka
Es индекс: test-kafka
ES mapping
{
"mappings":{
"properties" :{
"ppid":{
"type":"long"
},
"field1":{
"type":"long"
},
"field2":{
"type":"long"
},
"time1":{
"type":"date",
"format":"yyyy-MM-dd HH:mm:ss"
},
"time2":{
"type":"date",
"format":"yyyy-MM-dd HH:mm:ss"
},
"status":{
"type":"keyword"
},
"field3":{
"type":"integer"
},
"field4":{
"type":"integer"
}
}
}
}
полезная нагрузка отправляется наКафка тема
{ "ppid" : 1, "field1":2 , "field2":1,"time1":"2019-09-25 07:36:48", "time2":"2019-09-25 07:36:48", "status":"SUCCESS", "field3":30,"field4":16}