Я использую плагин kafka-connect -asticsearch для получения сообщения от моей kafka на Elasticsearch.Мои данные в kafka содержат поле даты (формат отметки времени).
Моя первая проблема заключалась в том, что при использовании этого плагина индекс Elasticsearch не распознавал поле даты как тип даты, но как длинный ...Я вроде как решаю это с помощью преобразования SMT в конфигурации моего коннектора.
Вот моя текущая конфигурация, которая позволяет мне передавать данные в Elastic:
{
"name": "elasticsearch-sink-test",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "test.test",
"key.ignore": "true",
"connection.url": "http://localhost:9200",
"type.name": "kafka-connect",
"name": "elasticsearch-sink-test",
"Batch.size": 100,
"max.buffered.records": 1000,
"Max.retries": 10,
"Retry.backoff.ms": 1000,
"flush.timeout.ms": 20000,
"max.in.flight.requests": 3
"transforms": "date",
"transforms.date.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.date.target.type": "Date",
"transforms.date.field": "date",
"transforms.date.format": "yyyy-MM-dd HH:mm:ss"
}
}
Моя проблема сейчас заключается в том, что:Elasticsearch не получает все предыдущие сообщения, сохраненные в kafka, а только новое (все новые сообщения, которые передаются в kafka после того, как я запустил соединитель Elasticsearch).
Как настроить соединитель, чтобы сделать его эластичнымполучить все сообщения?Есть ли обходной путь, который делает упругий «понимающий», что поле даты является меткой времени?
(Для информации, мой источник данных - MongoDB с разъемом CDC debezium)
Заранее спасибо