Порядок сообщений с Kafka Connect Elasticsearch Connector - PullRequest
0 голосов
/ 17 января 2019

У нас возникают проблемы с обеспечением порядка, в котором сообщения из темы Kafka отправляются в Elasticsearch с помощью Kafka Connect Elasticsearch Connector. В этой теме сообщения располагаются в правильном порядке с правильными смещениями, но если два сообщения с одинаковым идентификатором создаются в быстрой последовательности, они периодически отправляются в Elasticsearch в неправильном порядке. Это приводит к тому, что Elasticsearch получает данные из второго последнего сообщения, а не из последнего сообщения. Если мы добавим искусственную задержку в одну или две секунды между двумя сообщениями в теме, проблема исчезнет.

В документации здесь указано:

Порядок обновления на уровне документа обеспечивается использованием уровня раздела Смещение Кафки в качестве версии документа и использование version_mode=external.

Однако я нигде не могу найти документацию об этой настройке version_mode и о том, нужно ли нам что-то устанавливать для нее.

В файлах журнала системы Kafka Connect мы видим, что два сообщения (для одного и того же идентификатора) обрабатываются в неправильном порядке, с интервалом в несколько миллисекунд. Возможно, важно, что они обрабатываются в разных потоках. Также обратите внимание, что для этой темы существует только один раздел, поэтому все сообщения находятся в одном разделе.

Ниже приведен фрагмент журнала, слегка отредактированный для ясности. Сообщения в теме Кафки заполнены Debezium, который, я думаю, не имеет отношения к проблеме, но, как оказалось, легко включает значение метки времени. Это показывает, что сообщения обрабатываются в неправильном порядке (хотя они находятся в правильном порядке в теме Kafka, заполненной Debezium):

[2019-01-17 09:10:05,671] DEBUG http-outgoing-1 >> "
{
  "op": "u",
  "before": {
    "id": "ac025cb2-1a37-11e9-9c89-7945a1bd7dd1",
    ... << DATA FROM BEFORE SECOND UPDATE >> ...
  },
  "after": {
    "id": "ac025cb2-1a37-11e9-9c89-7945a1bd7dd1",
    ... << DATA FROM AFTER SECOND UPDATE >> ...
  },
  "source": { ... },
  "ts_ms": 1547716205205
}
" (org.apache.http.wire)

...

[2019-01-17 09:10:05,696] DEBUG http-outgoing-2 >> "
{
  "op": "u",
  "before": {
    "id": "ac025cb2-1a37-11e9-9c89-7945a1bd7dd1",
    ... << DATA FROM BEFORE FIRST UPDATE >> ...
  },
  "after": {
    "id": "ac025cb2-1a37-11e9-9c89-7945a1bd7dd1",
    ... << DATA FROM AFTER FIRST UPDATE >> ...
  },
  "source": { ... },
  "ts_ms": 1547716204190
}
" (org.apache.http.wire)

Кто-нибудь знает, как заставить этот соединитель поддерживать порядок сообщений для данного идентификатора документа при отправке сообщений в Elasticsearch?

1 Ответ

0 голосов
/ 18 января 2019

Проблема заключалась в том, что наш разъем Elasticsearch имел конфигурацию key.ignore, установленную на true.

Мы обнаружили эту строку в источнике Github для коннектора (в DataConverter.java ):

final Long version = ignoreKey ? null : record.kafkaOffset();

Это означало, что с key.ignore=true операции индексации, которые генерировались и отправлялись в Elasticsearch, были фактически «безвизовыми» ... в основном, последний набор данных, который Elasticsearch получил для документа, заменит любые предыдущие данные, даже если это были «старые данные».

При просмотре файлов журнала у соединителя, кажется, есть несколько потоков потребителей, читающих исходную тему, а затем передающих преобразованные сообщения в Elasticsearch, но порядок их передачи в Elasticsearch не обязательно совпадает с порядком тем.

Используя key.ignore=false, каждое сообщение Elasticsearch теперь содержит значение версии, равное смещению записи Kafka, и Elasticsearch отказывается обновлять данные индекса для документа, если он уже получил данные для более поздней "версии".

Это не было только , что исправило это. Нам все еще пришлось применить преобразование к сообщению Debezium из темы Kafka, чтобы получить ключ в текстовом формате, которым Elasticsearch был доволен:

"transforms": "ExtractKey",
"transforms.ExtractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.ExtractKey.field": "id"
...