Kafka-confluent: Как использовать pk.mode = record_key для режима upsert и delete в коннекторе приемника JDB C? - PullRequest
0 голосов
/ 07 апреля 2020

В Kafka Confluent, как мы можем использовать upsert, используя источник в качестве CSV-файла, при использовании pk.mode=record_key для составного ключа в таблице MySQL? Режим upsert работает при использовании pk.mode=record_values. Есть ли необходимость в дополнительной настройке?

Я получаю эту ошибку, если пытаюсь использовать pk.mode=record_key. Ошибка - вызвано: org.apache.kafka.connect.errors.ConnectException: нужен только один столбец PK, поскольку схема ключей для записей является примитивным типом. Ниже моя конфигурация разъема мойки JDB C:

    {
    "name": "<name>",
    "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "topics": "<topic name>",
    "connection.url": "<url>",
    "connection.user": "<user name>",
    "connection.password": "*******",
    "insert.mode": "upsert",
    "batch.size": "50000",
    "table.name.format": "<table name>",
    "pk.mode": "record_key",
    "pk.fields": "field1,field2",
    "auto.create": "true",
    "auto.evolve": "true",
    "max.retries": "10",
    "retry.backoff.ms": "3000",
    "mode": "bulk",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schemas.enable": "true",
    "value.converter.schema.registry.url": "http://localhost:8081"
  }
}

1 Ответ

0 голосов
/ 07 апреля 2020

Вам нужно использовать pk.mode из record.value. Это означает, что поле (поля) взято из значения сообщения и используется в качестве первичного ключа в целевой таблице и для целей UPSERT.

Если вы установите record.key, он попытается взять ключевые поля из сообщения Кафки ключ . Если вы на самом деле не получили значения в своем ключе сообщения, это не тот параметр, который вы хотите использовать.

Это может помочь вам в дальнейшем:

...