Почему запись не может быть удалена с помощью JDB C Sink Connector в Kafka Connect - PullRequest
1 голос
/ 13 апреля 2020

мои свойства раковины:

{
  "name": "jdbc-oracle",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "topics": "orders",
    "connection.url": "jdbc:oracle:thin:@10.1.2.3:1071/orac",
    "connection.user": "ersin",
    "connection.password": "ersin!",
    "auto.create": "true",
    "delete.enabled": "true",
    "pk.mode": "record_key",
    "pk.fields": "id",
    "insert.mode": "upsert",
    "plugin.path": "/home/ersin/confluent-5.4.1/share/java/",
    "name": "jdbc-oracle"
  },
  "tasks": [
    {
      "connector": "jdbc-oracle",
      "task": 0
    }
  ],
  "type": "sink"
}

мои connect-avro-distributed.properties :

bootstrap.servers=10.0.0.0:9092

group.id=connect-cluster

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://10.0.0.0:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://10.0.0.0:8081

config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-statuses

config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

I отправьте данные следующим образом:

./bin/kafka-avro-console-producer \
  --broker-list 10.0.0.0:9092 --topic orders \
  --property parse.key="true" \
  --property key.schema='{"type":"record","name":"key_schema","fields":[{"name":"id","type":"int"}]}' \
  --property key.separator="$" \
  --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":["null","int"],"default": null},{"name":"product","type": ["null","string"],"default": null}, {"name":"quantity", "type":  ["null","int"],"default": null}, {"name":"price","type":  ["null","int"],"default": null}]}' \
  --property schema.registry.url=http://10.0.0.0:8081

Я могу вставить или обновить данные на oracle вот так

{"id":3}${"id": {"int":2}, "product": {"string":"Yağız Gülbahar"}, "quantity": {"int":1071}, "price": {"int":1453}}

, но когда я поставить это, чтобы удалить запись на oracle он не может удалить данные просто обновить столбцы как ноль

{"id":2}${"id": null, "product": null , "quantity": null , "price": null }

как я могу решить эту проблему?

заранее спасибо

1 Ответ

1 голос
/ 13 апреля 2020

То, что вы пытаетесь создать, на самом деле Tombstone Record . В Kafka удаление с использованием JDB C Sink Connector работает следующим образом:

Соединитель может удалять строки в таблице базы данных, когда он использует запись-захоронение, которая является записью Kafka с ненулевым значением ключ и нулевое значение. Это поведение по умолчанию отключено, что означает, что любые записи-захоронения приведут к выходу из строя соединителя, что упрощает обновление соединителя JDB C и сохраняет прежнее поведение.

Удаление можно включить с помощью delete.enabled=true, но только когда pk.mode установлен на record_key. Это связано с тем, что удаление строки из таблицы требует использования первичного ключа в качестве критерия.

Включение режима удаления не влияет на insert.mode.

Также обратите внимание, что этот вид запись будет удалена только через delete.retention.ms миллисекунд, которая по умолчанию составляет 24 часа.


Поэтому попробуйте уменьшить эту конфигурацию в своих свойствах и посмотреть, работает ли она. Для этого вам нужно выполнить следующую команду:

 ./bin/kafka-topics.sh \
    --alter \
    --zookeeper localhost:2181 \
    --topic orders \
    --config retention.ms=100 

Теперь, когда настройка завершена, все, что вам нужно сделать, это сгенерировать сообщение с ненулевым ключом и нулевым значением , Обратите внимание, что Kafka Console Consumer нельзя использовать для создания нулевой записи, поскольку пользовательский ввод анализируется как UTF-8. Следовательно,

{"id":2}${"id": null, "product": null , "quantity": null , "price": null }

не является реальным сообщением о надгробии.

Однако вы можете использовать kafkacat, но он работает только для JSON сообщений:

# Produce a tombstone (a "delete" for compacted topics) for key 
# "abc" by providing an empty message value which -Z interpretes as NULL:

echo "abc:" | kafkacat -b mybroker -t mytopic -Z -K:

, но в вашем случае это не сработает как вам нужно отправлять авро сообщения. Поэтому я бы посоветовал написать очень простой Avro Producer на вашем любимом языке, чтобы вы могли отправлять надгробные сообщения.

...