Ошибка при включении вложенного Avro в таблицу MS SQL с использованием kafka-connect-jdbc в kafka - PullRequest
0 голосов
/ 24 декабря 2018

Как часть POC, я пытаюсь вставить сообщения Avro с включенным реестром схемы из раздела Kafka Topics в JDBC Sink (база данных MS SQL). Но я сталкиваюсь с некоторыми проблемами при загрузке вложенных данных avro в таблицу MS Sql.Я использую kafka-connect-jdbc-sink для загрузки данных avro в таблицу MS Sql от Kafka Avro Console Producer.

Подробности, указанные ниже

Команда CLI Kafka Avro Producer

    kafka-avro-console-producer --broker-list server1:9092, server2:9092,server3:9092 --topic testing25 --property schema.registry.url=http://server3:8081 --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"tradeid","type":"int"},{"name":"tradedate", "type": "string"}, {"name":"frontofficetradeid", "type": "int"}, {"name":"brokerorderid","type": "int"}, {"name":"accountid","type": "int"}, {"name": "productcode", "type": "string"}, {"name": "amount", "type": "float"}, {"name": "trademessage", "type": { "type": "array", "items": "string"}}]}'

JDBC-Sink.properties

        name=test-sink
        connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
        tasks.max=1
        topics=testing25     connection.url=jdbc:sqlserver://testing;DatabaseName=testkafkasink;user=Testkafka
        insert.mode=upsert
        pk.mode=record_value
        pk.fields=tradeid
        auto.create=true
        tranforms=FlattenValueRecords
        transforms.FlattenValueRecords.type=org.apache.kafka.connect.transforms.Flatten$Value
        transforms.FlattenValueRecords.field=trademessage

connect-avro-standalone.properties

bootstrap.servers=server1:9092,server2:9092,server3:9092
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://server3:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://server3:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
plugin.path=/usr/share/java

Таким образом, после запуска jdbc-sink и производителя, когда я пытаюсь вставить данные в cli, я получаю эту ошибку

ERROR WorkerSinkTask{id=test-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask:584)
org.apache.kafka.connect.errors.ConnectException: null (ARRAY) type doesn't have a mapping to the SQL database column type

Я понимаю, что сбой в типе данных Array, как это делает SQL Serverне содержат ни одного такого типа данных.Поэтому я исследовал и обнаружил, что мы можем использовать функциональность Kafka Connect SMT (Single Message Transform) (сглаживать), чтобы сгладить вложенные значения.

Но, похоже, это не работает в моем случае.Значения преобразования, переданные в JDBC-приемнике, ничего не делают.Infact я тестировал с другими преобразованиями, а также InsertField $ Value и InsertField $ Key, но ни одно из них не работает.Пожалуйста, дайте мне знать, если я делаю что-то не так в выполнении этих преобразований в Kafka Connect.

Любая помощь будет оценена.

Спасибо

...