Отметка времени в схеме avro приводит к несовместимой проверке значений в Kafka Connect JDBC - PullRequest
0 голосов
/ 02 декабря 2018

Ошибка, вызванная соединителем приемника JDBC:

org.apache.kafka.connect.errors.DataException: Invalid Java object for schema type INT64: class java.util.Date for field: "some_timestamp_field"
at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:242)
at org.apache.kafka.connect.data.Struct.put(Struct.java:216)
at org.apache.kafka.connect.transforms.Cast.applyWithSchema(Cast.java:151)
at org.apache.kafka.connect.transforms.Cast.apply(Cast.java:107)
at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:480)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Схема avro, зарегистрированная исходным соединителем JDBC (MySQL):

{  
   "type":"record",
   "name":"ConnectDefault",
   "namespace":"io.confluent.connect.avro",
   "fields":[  
      ...
      {  
         "name":"some_timestamp_field",
         "type":{  
            "type":"long",
            "connect.version":1,
            "connect.name":"org.apache.kafka.connect.data.Timestamp",
            "logicalType":"timestamp-millis"
         }
      },
      ...
   ]
}

Похоже, исключение связано с этим блоком кода: https://github.com/apache/kafka/blob/f0282498e7a312a977acb127557520def338d45c/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java#L239

Таким образом, в схеме avro поле отметки времени зарегистрировано как INT64 с правильным (отметка времени) логическим типом.Но connect считывает тип схемы как INT64 и сравнивает его с типом значения java.util.Date.

Это ошибка или есть обходной путь?Возможно, я что-то упустил, так как это выглядит как стандартная модель подключения.

Заранее спасибо.

ОБНОВЛЕНИЕ

Конфигурация разъема раковины:

{
    "name": "sink",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "topic",
        "connection.url": "jdbc:postgresql://host:port/db",
        "connection.user": "user",
        "connection.password": "password",

        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://host:port",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://host:port",

        "auto.create": "true",
        "insert.mode": "upsert",
        "pk.mode": "record_value",
        "pk.fields": "id"
    }
}

Десериализованные данные в Кафке:

{
   "id":678148,
   "some_timestamp_field":1543806057000,
   ...
}

1 Ответ

0 голосов
/ 04 декабря 2018

Мы разработали work around для этой проблемы.Наша цель состояла в том, чтобы преобразовать идентификатор из BIGINT в STRING (TEXT / VARCHAR) и сохранить запись в нисходящем БД.

Но из-за проблемы (вероятно, https://issues.apache.org/jira/browse/KAFKA-5891), приведение поля id не работало. Кафка пытался проверить поля меток времени также в цепочке преобразования, но неправильно читал тип / имя схемыи в результате несоответствие типов (см. приведенное выше тело записи и журнал ошибок).

Таким образом, мы сделали следующее:

extract only the id field as key -> execute cast transform on the key -> it works as key does not contain timestamp field.

Вот обходная конфигурация:

{
    "name": "sink",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "topic",
        "connection.url": "jdbc:postgresql://host:port/db",
        "connection.user": "user",
        "connection.password": "password",

        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://host:port",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://host:port",

        "transforms": "createKey,castKeyToString",
        "transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
        "transforms.createKey.fields": "id",

        "transforms.castKeyToString.type": "org.apache.kafka.connect.transforms.Cast$Key",
        "transforms.castKeyToString.spec": "id:string",

        "auto.create": "true",
        "insert.mode": "upsert",
        "pk.mode": "record_key",
        "pk.fields": "id"
    }
}

Отказ от ответственности: Это неправильное решение, просто обходной путь. Ошибка преобразования преобразования должна бытьисправлено. На мой взгляд, преобразование приведения должно касаться только полей, предназначенных для приведения, а не других полей в сообщении.

Хорошего дня.

...