kafka-connect -asticsearch: При использовании «write.method» в качестве upsert, возможно ли использовать тот же объект AVRO на kafka topi c для отправки частичного документа? - PullRequest
1 голос
/ 02 мая 2020

Я пытаюсь использовать "write.method" для коннектора Elasticsearch (ES) kafka. Из моего приложения kafka streams я пишу свой документ, который хочу сохранить, на kafka topi c, с которого ES-соединитель настроен для чтения. Я использую объекты avro в качестве значений kafka для этой топи c. Определение AVRO моего документа выглядит следующим образом:

{
  "type": "record",
  "name": "Document",
  "fields": [
    {
      "name": "id",
      "type": ["null", "string"],
    },
    {
      "name": "name",
      "type": ["null", "string"]
    },
    {
      "name": "address",
      "type": ["null", "string"]
    }
  ]
}

Документ содержит только идентификатор и имя, а иногда и просто адрес. имя и имя перезаписываются, когда я просто отправляю адрес и наоборот. Я установил behavior.on.null.values на ignore, надеясь, что ES-соединитель будет игнорировать нулевые значения идентификатора и имени, но это не сработает, как ожидалось.

Хотя, когда я использую два различных объекта AVRO на моем kafka topi c, первый из которых содержит только идентификатор и имя, а другой - только адрес, поведение режима upsert является ожидаемым. Но для того же kafka topi c, чтобы разрешить несколько определений объектов AVRO, мне нужно установить режим совместимости topi c на NONE, что не идеально.

Как правильно решить эту проблему?

1 Ответ

2 голосов
/ 02 мая 2020

Параметр behavior.on.null.values = ignore просто сообщает соединителю, что если он получит сообщение, в котором все сообщение равно нулю, игнорировать это сообщение (другие параметры могут быть неудачными или удалить целевой документ в Elasticsearch, сопоставляющий ключ сообщения с нулевым значением, то есть надгробным сообщением).

Соединитель не поддерживает поведение, которое вы описываете для частичных обновлений. Он может вставлять / обновлять / удалять, но только целые документы

Если вы хотите, чтобы поведение частичной загрузки происходило тогда, вам нужно реализовать это самостоятельно, либо в пользовательском соединителе, либо в состоянии хранения в приложении Kafka Streams, чтобы возможность выдавать полную запись каждый раз, когда возникает дельта.

Возможны частичные обновления с write.method=upsert

...