У меня задание Kafka-Connect, настроенное на периодический запрос таблицы MySQL и размещение сообщений в очереди.Структура этих сообщений определяется с помощью схемы Avro.У меня проблема с сопоставлением для одного из моих столбцов.
Столбец определен как tinyint (1) в моей схеме MySQL, и я пытаюсь сопоставить его с логическим полем в моем объекте avro..
{
"name": "is_active",
"type": "boolean"
}
Задания kafka-connect запускаются, и сообщения помещаются в очередь, но когда мое приложениекто читает из очереди пытается десериализовать сообщения, я получаю следующую ошибку:
org.apache.avro.AvroTypeException: Found int, expecting boolean
Я надеялся, что значение 1 или 0 может быть автоматически сопоставлено с логическим значением, но это не похоже наcase.
Я также пытался настроить свою работу на использование преобразования «Cast», но, похоже, это вызвало проблемы с другими полями в сообщении.
"transforms": "Cast",
"transforms.Cast.type": "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.Cast.spec": "is_active:boolean"
Возможно ли то, что я пытаюсь сделать, или мне придется изменить свое приложение для работы со значением int?
Вот моя полная конфигурация (я удалил некоторые другие не относящиеся к делу поля)
Конфигурация задания Kafka Connect
{
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"mode": "bulk",
"topic.prefix": "my_topic-name",
"transforms.SetSchemaMetadata.type": "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
"query": "select is_active from my_table",
"poll.interval.ms": "30000",
"transforms": "SetSchemaMetadata",
"name": "job_name",
"connection.url": "connectiondetailshere",
"transforms.SetSchemaMetadata.schema.name": "com.my.model.name"
}
Схема AVRO
{
"type": "record",
"name": "name",
"namespace": "com.my.model",
"fields": [<br>
{
"name": "is_active",
"type": "long"
}
],
"connect.name": "com.my.model.name"
}