Соединитель JDBC Kafka-Connect tinyint для логического отображения - PullRequest
0 голосов
/ 02 июля 2019

У меня задание Kafka-Connect, настроенное на периодический запрос таблицы MySQL и размещение сообщений в очереди.Структура этих сообщений определяется с помощью схемы Avro.У меня проблема с сопоставлением для одного из моих столбцов.

Столбец определен как tinyint (1) в моей схеме MySQL, и я пытаюсь сопоставить его с логическим полем в моем объекте avro..

enter image description here

{ "name": "is_active", "type": "boolean" }

Задания kafka-connect запускаются, и сообщения помещаются в очередь, но когда мое приложениекто читает из очереди пытается десериализовать сообщения, я получаю следующую ошибку:

org.apache.avro.AvroTypeException: Found int, expecting boolean

Я надеялся, что значение 1 или 0 может быть автоматически сопоставлено с логическим значением, но это не похоже наcase.

Я также пытался настроить свою работу на использование преобразования «Cast», но, похоже, это вызвало проблемы с другими полями в сообщении.

"transforms": "Cast", "transforms.Cast.type": "org.apache.kafka.connect.transforms.Cast$Value", "transforms.Cast.spec": "is_active:boolean"

Возможно ли то, что я пытаюсь сделать, или мне придется изменить свое приложение для работы со значением int?

Вот моя полная конфигурация (я удалил некоторые другие не относящиеся к делу поля)

Конфигурация задания Kafka Connect

{ "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "mode": "bulk", "topic.prefix": "my_topic-name", "transforms.SetSchemaMetadata.type": "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value", "query": "select is_active from my_table", "poll.interval.ms": "30000", "transforms": "SetSchemaMetadata", "name": "job_name", "connection.url": "connectiondetailshere", "transforms.SetSchemaMetadata.schema.name": "com.my.model.name" }

Схема AVRO

{ "type": "record", "name": "name", "namespace": "com.my.model", "fields": [<br> { "name": "is_active", "type": "long" } ], "connect.name": "com.my.model.name" }

1 Ответ

0 голосов
/ 02 июля 2019

Вы можете сделать это либо с помощью пользовательского преобразования (это идеальный вариант использования), либо написать простое потоковое приложение для этого, например, в KSQL:

CREATE STREAM my_topic AS 
  SELECT COL1, COL2, …
         CASE WHEN is_active=1 THEN TRUE ELSE FALSE AS is_active_bln
  FROM my_source_connect_topic;
ksql> describe my_topic;

Name                 : my_topic
 Field         | Type
-----------------------------------------
 ROWTIME       | BIGINT           (system)
 ROWKEY        | VARCHAR(STRING)  (system)
 COL1          | INTEGER
 COL1          | VARCHAR
 IS_ACTIVE_BLN | BOOLEAN
----------------------------------------
...