ElasticSearch IO Как удалить идентификатор из документа JSON перед записью - PullRequest
0 голосов
/ 03 декабря 2018

У меня есть потоковое задание Apache Beam, которое считывает данные из Kafka и пишет в ElasticSearch, используя ElasticSearchIO.

Проблема, с которой я сталкиваюсь, состоит в том, что сообщения в Kafka уже имеют поле key и использование ElasticSearchIO.Write.withIdFn() Я сопоставляю это поле с полем _id документа в ElasticSearch.

Имея большой объем данных, я не хочу, чтобы поле key также записывалось в ElasticSearch как часть _source.

Есть ли опция / обходной путь, который позволил бы сделать это?

Ответы [ 2 ]

0 голосов
/ 04 февраля 2019

Я создал заявку в Apache Beam JIRA , описывающую эту проблему.

На данный момент исходная проблема не может быть решена в рамках процесса индексации с использованием Apache Beam API.

Обходной путь, предложенный одним из сопровождающих Этьеном Чаучо, состоит в том, чтобы создать отдельную задачу, которая очистит индексированные данные после слов.

См. Например, удалить поле из документа Elasticsearch .

В будущем, если кто-то также захочет использовать такую ​​функцию, вы можете перейти по связанной ссылке.билет.

0 голосов
/ 03 декабря 2018

Используя Ingest API и процессор удаления , вы сможете решить эту проблему довольно просто, только с помощью вашего кластера эластичного поиска.Вы также можете смоделировать конвейер загрузки и результаты.

Я подготовил пример, который, вероятно, охватит ваш случай:

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "remove id form incoming docs",
    "processors": [
      {"remove": {
        "field": "id",
        "ignore_failure": true
      }}
    ]
  },
  "docs": [
      {"_source":{"id":"123546", "other_field":"other value"}}
    ]
}

Видите ли, есть одинтестовый документ, содержащий поле "id".Это поле больше не присутствует в ответе / результате:

{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_type",
        "_id" : "_id",
        "_source" : {
          "other_field" : "other value"
        },
        "_ingest" : {
          "timestamp" : "2018-12-03T16:33:33.885909Z"
        }
      }
    }
  ]
}
...