У нас возникают проблемы с обеспечением порядка, в котором сообщения из темы Kafka отправляются в Elasticsearch с помощью Kafka Connect Elasticsearch Connector. В этой теме сообщения располагаются в правильном порядке с правильными смещениями, но если два сообщения с одинаковым идентификатором создаются в быстрой последовательности, они периодически отправляются в Elasticsearch в неправильном порядке. Это приводит к тому, что Elasticsearch получает данные из второго последнего сообщения, а не из последнего сообщения. Если мы добавим искусственную задержку в одну или две секунды между двумя сообщениями в теме, проблема исчезнет.
В документации здесь указано:
Порядок обновления на уровне документа обеспечивается использованием уровня раздела
Смещение Кафки в качестве версии документа и использование version_mode=external
.
Однако я нигде не могу найти документацию об этой настройке version_mode
и о том, нужно ли нам что-то устанавливать для нее.
В файлах журнала системы Kafka Connect мы видим, что два сообщения (для одного и того же идентификатора) обрабатываются в неправильном порядке, с интервалом в несколько миллисекунд. Возможно, важно, что они обрабатываются в разных потоках. Также обратите внимание, что для этой темы существует только один раздел, поэтому все сообщения находятся в одном разделе.
Ниже приведен фрагмент журнала, слегка отредактированный для ясности. Сообщения в теме Кафки заполнены Debezium, который, я думаю, не имеет отношения к проблеме, но, как оказалось, легко включает значение метки времени. Это показывает, что сообщения обрабатываются в неправильном порядке (хотя они находятся в правильном порядке в теме Kafka, заполненной Debezium):
[2019-01-17 09:10:05,671] DEBUG http-outgoing-1 >> "
{
"op": "u",
"before": {
"id": "ac025cb2-1a37-11e9-9c89-7945a1bd7dd1",
... << DATA FROM BEFORE SECOND UPDATE >> ...
},
"after": {
"id": "ac025cb2-1a37-11e9-9c89-7945a1bd7dd1",
... << DATA FROM AFTER SECOND UPDATE >> ...
},
"source": { ... },
"ts_ms": 1547716205205
}
" (org.apache.http.wire)
...
[2019-01-17 09:10:05,696] DEBUG http-outgoing-2 >> "
{
"op": "u",
"before": {
"id": "ac025cb2-1a37-11e9-9c89-7945a1bd7dd1",
... << DATA FROM BEFORE FIRST UPDATE >> ...
},
"after": {
"id": "ac025cb2-1a37-11e9-9c89-7945a1bd7dd1",
... << DATA FROM AFTER FIRST UPDATE >> ...
},
"source": { ... },
"ts_ms": 1547716204190
}
" (org.apache.http.wire)
Кто-нибудь знает, как заставить этот соединитель поддерживать порядок сообщений для данного идентификатора документа при отправке сообщений в Elasticsearch?