`delete.enabled = true` не удаляет запись в MySQL через JDB C коннектор приемника - PullRequest
0 голосов
/ 15 апреля 2020

Мой файл конфигурации приемника содержит следующие конфигурации -

...
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "DELETESRC",
"insert.mode": "upsert",
"batch.size": "50000",
"table.name.format": "DELETESRC",
"pk.mode": "record_key",
"pk.fields": "ID,C_NO",
"delete.enabled": "true",
"auto.create": "true",
"auto.evolve": "true",
"max.retries": "10",
"retry.backoff.ms": "3000",
"mode": "bulk",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schemas.enable": "true",
"value.converter.schema.registry.url": "http://localhost:8081",
"transforms": "ValueToKey",
"transforms.ValueToKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.ValueToKey.fields": "ID,C_NO"
...

Я могу использовать upsert с помощью клавиш, но не могу использовать режим удаления в приемнике JDB C. Я настроил topi c DELETESRC как cleanup.policy=compact, delete.retention.ms=0. Я создал поток K SQL как с 4 столбцами (ID,CMP,SEG,C_NO) и использовал вставку в операторы в K SQL для pu sh данных.

INSERT INTO DELETESRC VALUES ('null','11','D','1','3')
INSERT INTO DELETESRC VALUES ('null','11','C','1','4')
INSERT INTO DELETESRC VALUES ('null','12','F','1','3')

Но когда я делаю INSERT INTO DELETESRC VALUES ('null','11','null','null','3'), приемник обновляет таблицу как 11,null,null,3. Я изучил другие ответы в переполнении стека, но эти решения не сработали.

Я делаю какую-либо ошибку при создании записи надгробной плиты?

Я пробовал другие способы в операторе вставки в K SQL, но операция удаления не выполняется.

1 Ответ

0 голосов
/ 15 апреля 2020

Для создания правильного сообщения-надгробия вам необходимо предоставить сообщение с ключом со значением null. В этом примере вы не показываете нулевое значение.


Кроме того, я думаю, вам нужно увеличить delete.retention.ms:

Сумма времени, чтобы сохранить удаление надгробных маркеров для сжатых тем журнала. Этот параметр также определяет время, в течение которого потребитель должен завершить чтение, если он начинает со смещения 0, чтобы гарантировать, что он получит действительный снимок последнего этапа (в противном случае удаляемые надгробия могут быть собраны до завершения сканирования). ) .

...