Ошибка JDBC Kafka Connect на JsonConverter - PullRequest
0 голосов
/ 27 февраля 2019

Я работаю над дизайном MySQL -> Debezium -> Kafka -> Flink -> Kafka -> Kafka Connect JDBC -> MySQL.Ниже приведен пример сообщения, которое я пишу от Flink (я также пытался использовать производителя консоли Kafka)

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int64",
        "optional": false,
        "field": "id"
      },
      {
        "type": "string",
        "optional": true,
        "field": "name"
      }
    ],
    "optional": true,
    "name": "user"
  },
  "payload": {
    "id": 1,
    "name": "Smith"
  }
}

, но на JsonConverter не удалось установить соединение

DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
        at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:338)

Я отладил и в методе public SchemaAndValue toConnectData(String topic, byte[] value) значение равно нулю.Мои конфигурации раковин:

{
    "name": "user-sink",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "user",
        "connection.url": "jdbc:mysql://localhost:3306/my_db?verifyServerCertificate=false",
        "connection.user": "root",
        "connection.password": "root",        
        "auto.create": "true",
        "insert.mode": "upsert",
        "pk.fields": "id",
        "pk.mode": "record_value"
    }
}

Может кто-нибудь помочь мне в этом вопросе?

1 Ответ

0 голосов
/ 27 февраля 2019

Я думаю, что проблема не связана с сериализацией значения (сообщения Кафки).Это скорее проблема с ключом сообщения.

Какой у вас key.converter?Я думаю, что это то же самое, что и value.converter (org.apache.kafka.connect.json.JsonConverter).Ваш ключ может быть простым String, который не содержит schema, payload

Попробуйте изменить key.converter на org.apache.kafka.connect.storage.StringConverter

Для Kafka Connect вы установите значение по умолчанию Converters, но вы также можете установить конкретный для вашей конкретной конфигурации Соединителя (который будет перезаписан по умолчанию).Для этого вы должны изменить ваш запрос конфигурации:

{
    "name": "user-sink",
    "config": {
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "user",
        "connection.url": "jdbc:mysql://localhost:3306/my_db?verifyServerCertificate=false",
        "connection.user": "root",
        "connection.password": "root",        
        "auto.create": "true",
        "insert.mode": "upsert",
        "pk.fields": "id",
        "pk.mode": "record_value"
    }
}
...