Приемник Elasticsearch получает только новые сообщения, а не предыдущее с использованием kafka-connect -asticsearch + timestamp SMT - PullRequest
0 голосов
/ 25 декабря 2018

Я использую плагин kafka-connect -asticsearch для получения сообщения от моей kafka на Elasticsearch.Мои данные в kafka содержат поле даты (формат отметки времени).

Моя первая проблема заключалась в том, что при использовании этого плагина индекс Elasticsearch не распознавал поле даты как тип даты, но как длинный ...Я вроде как решаю это с помощью преобразования SMT в конфигурации моего коннектора.

Вот моя текущая конфигурация, которая позволяет мне передавать данные в Elastic:

{
  "name": "elasticsearch-sink-test",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "1",
    "topics": "test.test",
    "key.ignore": "true",
    "connection.url": "http://localhost:9200",
    "type.name": "kafka-connect",
    "name": "elasticsearch-sink-test",
    "Batch.size": 100,
    "max.buffered.records": 1000,
    "Max.retries": 10,
    "Retry.backoff.ms": 1000,
    "flush.timeout.ms": 20000,
    "max.in.flight.requests": 3
    "transforms": "date",
    "transforms.date.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
    "transforms.date.target.type": "Date",
    "transforms.date.field": "date",
    "transforms.date.format": "yyyy-MM-dd HH:mm:ss"
  }
}

Моя проблема сейчас заключается в том, что:Elasticsearch не получает все предыдущие сообщения, сохраненные в kafka, а только новое (все новые сообщения, которые передаются в kafka после того, как я запустил соединитель Elasticsearch).

Как настроить соединитель, чтобы сделать его эластичнымполучить все сообщения?Есть ли обходной путь, который делает упругий «понимающий», что поле даты является меткой времени?

(Для информации, мой источник данных - MongoDB с разъемом CDC debezium)

Заранее спасибо

1 Ответ

0 голосов
/ 25 декабря 2018

Как я могу настроить соединитель так, чтобы упругий получал все сообщения?

Точно так же, как обычному потребителю Kafka, вам нужно установить самые ранние смещения

consumer.auto.offset.reset=earliest 

Есть ли обходной путь, который упругий "понимает", что датаполе является отметкой времени?

Да, с использованием индекса или динамического сопоставления в Elasticsearch.По умолчанию все принятые числа являются только числовыми значениями.Только правильно отформатированные строки даты фактически индексируются как даты.Если вы не управляете сервером Elasticsearch или настройками индекса, это обычно настраивается администратором этой системы

...