У меня запущена настройка Kafka с соединителем Elasticsearch, и я успешно индексирую новые документы в индекс ES на основе входящих сообщений по определенной теме.
Однако на основе входящих сообщений по другой темеМне нужно добавить данные в поле определенного документа в том же индексе.
Псевдо-схема ниже:
{
"_id": "6993e0a6-271b-45ef-8cf5-1c0d0f683acc",
"uuid": "6993e0a6-271b-45ef-8cf5-1c0d0f683acc",
"title": "A title",
"body": "A body",
"created_at": 164584548,
"views": []
}
^ Этот документ в ES создается нормально на основеданные в теме, упомянутой выше.
Однако, как мне затем добавить элементы в поле views
, используя сообщения из другой темы. Вот так:
article-view
схема темы:
{
"article_id": "6993e0a6-271b-45ef-8cf5-1c0d0f683acc",
"user_id": 123456,
"timestamp: 136389734
}
и вместо простого создания нового документа по индексу article-view
(который я даже не хочу иметь). Он добавляет это к полю views
в документе статьи с соответствующим _id
, равным article_id
из сообщения.
, поэтому конечный результат после одного сообщения будет:
{
"_id": "6993e0a6-271b-45ef-8cf5-1c0d0f683acc",
"uuid": "6993e0a6-271b-45ef-8cf5-1c0d0f683acc",
"title": "A title",
"body": "A body",
"created_at": 164584548,
"views": [
{
"user_id": 123456,
"timestamp: 136389734
}
]
}
Использование ES API возможно с помощью скрипта. Например:
{
"script": {
"lang": "painless",
"params": {
"newItems": [{
"timestamp": 136389734,
"user_id": 123456
}]
},
"source": "ctx._source.views.addAll(params.newItems)"
}
}
Я могу динамически создавать сценарии, как указано выше, а затем использовать функцию helpers.bulk
в библиотеке ES Python для массового обновления документов таким способом.
Этовозможно с Kafka Connect / Elasticsearch? На веб-сайте Confluent я не нашел никакой документации, объясняющей, как это сделать.
Это кажется довольно стандартным требованием и очевидной вещью, которую люди должны были бы сделать с разъемом раковины Kafka / A, таким как ES.
Спасибо!