Я пытаюсь настроить тест для перемещения данных из MySQL в Elasticsearch.
У меня есть докерская установка с брокером, zookeeper, connect, ksql server и cli, реестром схемы и Elasticsearch.Я использую образы докеров из конфлюэнтной версии 5.1.0 , а для Elasticsearch я использую asticsearch: 6.5.4
Я настроил JDBCСоединитель для передачи данных из MySQL в Kafka, это работает. Я вижу, что мои темы созданы и, используя ksql-cli, я вижу новые сообщения в потоке, когда я обновляю строки в MySQL.
Я также настроил соединитель приемника Elasticsearch соединитель успешно создан, и индекс в Elasticsearch также существует, но я вижу нет документов в моем индексе Elasticsearch .
Это ESКонфигурация коннектора раковины:
{
"name": "es-connector",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"connection.url": "http://es:9200",
"type.name": "_doc",
"topics": "test_topic",
"drop.invalid.message": true,
"behavior.on.null.values": "ignore",
"behavior.on.malformed.documents": "ignore",
"schema.ignore": true
}
}
Это то, что я вижу, когда запрашиваю состояние коннектора раковины: curl -X GET http://connect:8083/connectors/es-connector
{
"name": "es-connector",
"connector": {
"state": "RUNNING",
"worker_id": "connect:8083"
},
"tasks": [
{
"state": "RUNNING",
"id": 0,
"worker_id": "connect:8083"
}
],
"type": "sink"
}
В Elasticsearch я вижу индекс http://es:9200/test_topic/_search
{
"took": 1,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 0,
"max_score": null,
"hits": []
}
}
Я продолжаю делать обновления и вставки в MySQL, я вижу сообщения в потоке, используя ksql-cli, но в Elasticsearch документы не создаются.Я даже создал тему вручную, используя kafka-avro-console-producer
и опубликовал сообщения, затем создал второй коннектор приемника для этой темы и тот же результат, я вижу индекс, но нет документов.
Я не вижу ошибок в kafka-connectпоэтому я не понимаю, почему не работает.Что-то не так с конфигурацией разъема?Я что-то упустил?
Редактировать:
Для конфигурации приемника Elasticsearch, которую я пробовал с и без этих строк:
"drop.invalid.message": true,
"behavior.on.null.values": "ignore",
"behavior.on.malformed.documents": "ignore",
"schema.ignore": true
И результат тот же, документов нет.
Редактировать
Я обнаружил ошибку:
Ключ используется в качестве идентификатора документа и не может быть нулевым
.