Kafka Connect - вызвано: org. apache .kafka.connect.errors.ConnectException: режим PK для таблицы - RECORD_KEY, но схема ключа записи отсутствует - PullRequest
1 голос
/ 13 апреля 2020

У меня есть jdb c -sink для передачи данных из Kafka в Oracle База данных.

Мое соединение дает эту ошибку.

Caused by: org.apache.kafka.connect.errors.ConnectException: PK mode for table 'orders' is RECORD_KEY, but record key schema is missing

свойства мойки:

{
  "name": "jdbc-oracle",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "topics": "orders",
    "connection.url": "jdbc:oracle:thin:@10.1.2.3:1071/orac",
    "connection.user": "ersin",
    "connection.password": "ersin!",
    "auto.create": "true",
    "delete.enabled": "true",
    "pk.mode": "record_key",
    "pk.fields": "MESSAGE_KEY",
    "insert.mode": "update ",
    "plugin.path": "/home/ersin/confluent-5.4.1/share/java/",
    "name": "jdbc-oracle"
  },
  "tasks": [
    {
      "connector": "jdbc-oracle",
      "task": 0
    }
  ],
  "type": "sink"
}

мои connect-avro-distributed.properties :

bootstrap.servers=10.0.0.0:9092

group.id=connect-cluster

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://10.0.0.0:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://10.0.0.0:8081

config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-statuses

config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

Я отправляю данные вот так :

./bin/kafka-avro-console-producer \
--broker-list 10.0.0.0:9092 --topic orders \
--property parse.key="true" \
--property key.schema='{"type":"record","name":"key_schema","fields":[{"name":"id","type":"int"}]}' \
--property key.separator="$" \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"product","type":"string"}, {"name":"quantity", "type": "int"}, {"name":"price","type": "int"}]}' \
--property schema.registry.url=http://10.0.0.0:8081

Как я могу решить эту проблему?

заранее спасибо

1 Ответ

2 голосов
/ 13 апреля 2020

Кажется, проблема в вашей полезной нагрузке и конфигурации "pk.mode": "record_key".

pk.mode используется для определения режима первичного ключа, и у вас есть следующие параметры конфигурации:

  • none: ключи не используются
  • kafka : Координаты Кафки используются в качестве PK
  • record_key: используются поля из ключа записи, которые могут быть примитивами или структурами.
  • record_value: Field ( s) из записи используются значения, которые должны быть структурой.

В вашей конфигурации вы используете record_key, что означает, что Kafka Connect получит поле от ключа сообщение и использовать его в качестве первичного ключа в целевой таблице Oracle.

Несмотря на то, что вы не поделились конфигурацией своего рабочего Kafka Connect, я предполагаю, что вам не хватает некоторых параметров конфигурации.

В соответствии с документацией ,

Разъем приемника требует знания схем, поэтому вы должны использовать подходящий преобразователь, например преобразователь Avro, который поставляется со схемой реестр или конвертер JSON с включенными схемами. Ключи записи Kafka, если они есть, могут быть примитивными типами или структурой Connect, а значение записи должно быть структурой Connect. Поля, выбираемые из структур Connect, должны быть примитивных типов. Если данные в теме не имеют совместимого формата, может потребоваться реализация пользовательского Converter.


Теперь в вашем случае проблема, похоже, "pk.fields", которая в настоящее время установите на "pk.fields": "MESSAGE_KEY". В вашей схеме ключ сообщения определен как id. Таким образом, следующее должно быть сделано:

"pk.fields": "id"
...