Поддерживает ли Elasticsearch Sink Connector режим upsert для первичного ключа, как это делает коннектор приемника JDBC? - PullRequest
0 голосов
/ 11 декабря 2019

Я перемещаю данные из Mongodb -> Elasticsearch, используя kafka connect. На данный момент обновленные записи вставляются как новые документы в индексы Elasticsearch. Однако я хочу обновить существующие записи на основе идентификатора (аналогично write.mode = upsert в JDBC Sink Connector). Это возможно?

1 Ответ

0 голосов
/ 13 декабря 2019

Я решил проблему, настроив key.ignore=false, а затем преобразовал ключ события из {id: 1234} to 1234, используя ExtractField SMT

    key.ignore=false
    transforms=ExtractField
    transforms.ExtractField.field=id
    transforms.ExtractField.type=org.apache.kafka.connect.transforms.ExtractField$Key
...