Используйте Kafka Connect для обновления поля Elasticsearch в существующем документе вместо создания нового - PullRequest
0 голосов
/ 16 октября 2019

У меня запущена настройка Kafka с соединителем Elasticsearch, и я успешно индексирую новые документы в индекс ES на основе входящих сообщений по определенной теме.

Однако на основе входящих сообщений по другой темеМне нужно добавить данные в поле определенного документа в том же индексе.

Псевдо-схема ниже:

{
   "_id": "6993e0a6-271b-45ef-8cf5-1c0d0f683acc",
   "uuid": "6993e0a6-271b-45ef-8cf5-1c0d0f683acc",
   "title": "A title",
   "body": "A body",
   "created_at": 164584548,
   "views": []
}

^ Этот документ в ES создается нормально на основеданные в теме, упомянутой выше.

Однако, как мне затем добавить элементы в поле views, используя сообщения из другой темы. Вот так:

article-view схема темы:

{
   "article_id": "6993e0a6-271b-45ef-8cf5-1c0d0f683acc",
   "user_id": 123456,
   "timestamp: 136389734
}

и вместо простого создания нового документа по индексу article-view (который я даже не хочу иметь). Он добавляет это к полю views в документе статьи с соответствующим _id, равным article_id из сообщения.

, поэтому конечный результат после одного сообщения будет:

{
   "_id": "6993e0a6-271b-45ef-8cf5-1c0d0f683acc",
   "uuid": "6993e0a6-271b-45ef-8cf5-1c0d0f683acc",
   "title": "A title",
   "body": "A body",
   "created_at": 164584548,
   "views": [
       {
           "user_id": 123456,
           "timestamp: 136389734
       }
   ]
}

Использование ES API возможно с помощью скрипта. Например:

{
    "script": {
        "lang": "painless",
        "params": {
            "newItems": [{
                "timestamp": 136389734,
                "user_id": 123456
            }]
        },
        "source": "ctx._source.views.addAll(params.newItems)"
    }
}

Я могу динамически создавать сценарии, как указано выше, а затем использовать функцию helpers.bulk в библиотеке ES Python для массового обновления документов таким способом.

Этовозможно с Kafka Connect / Elasticsearch? На веб-сайте Confluent я не нашел никакой документации, объясняющей, как это сделать.

Это кажется довольно стандартным требованием и очевидной вещью, которую люди должны были бы сделать с разъемом раковины Kafka / A, таким как ES.

Спасибо!

1 Ответ

0 голосов
/ 16 октября 2019

Соединитель Elasticsearch не поддерживает это. Вы можете обновить документы на месте, но вам нужно отправить полный документ, а не дельту для добавления, что я думаю, что вы после.

...