Я работаю над ПО C, мне нужно прочитать CSV-файл и вставить его на сервер MS sql. Я создал конфигурацию ниже, но я получаю ниже исключения:
) \ nCaused by: org. apache .kafka.connect.errors.ConnectException: Схема значений должна иметь тип Struct
Конфигурация указана ниже:
1. Пример данных CSV-файла:
id,record1,record2,record3,created
1,1772056014794065487,160842,20668578,9999-12-31
2,1772056014794065487,160842,20668578,9999-12-31
3,1772056014794065487,160842,20668578,9999-12-31
4,1772056014794065487,160842,20668578,9999-12-31
5,1772056014794065487,160842,20668578,9999-12-31
2. коннектор файла-источника
{"name":"file-source",
"config":
{
"connector.class":"FileStreamSource",
"tasks.max":"1",
"file":"/tmp/my-connect-test.dat",
"topic":"connect-test",
"name":"file-source"},
"tasks":[{"connector":"file-source","task":0}],
"type":"source"}
3. jdb c -разъем для подключения:
{"name":"test-sink",
"config": {
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max":"1",
"topics":"connect-test",
"topic.prefix":"connect-test",
"insert.mode":"insert",
"table.name.format":"dz.temp_data",
"pk.mode":"record_value",
"pk.fields":"id",
"incrementing.column.name":"id",
"table.whitelist":"dz.temp_data",
"mode":"incrementing",
"key.converter.schemas.enable":"false",
"value.converter.schemas.enable":"false",
"key.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url":"http://localhost:8081",
"value.converter.schema.registry.url":"http://localhost8081",
"connection.url":"jdbc:sqlserver://**;databaseName=**;username=**;password=***",
"name":"test-sink"},
"tasks":[{"connector":"test-sink","task":0}],
"type":"sink"}
4. мс sql таблица:
CREATE TABLE dz.temp_data (
id INTEGER IDENTITY(1,1) NOT NULL PRIMARY KEY,
record1 VARCHAR(255) NOT NULL,
record2 VARCHAR(255) NOT NULL,
record3 VARCHAR(255) NOT NULL,
record4 VARCHAR(255) NOT NULL,
created VARCHAR(255) NOT NULL
);
Если я проверяю topi c пользователем avro, я получаю правильный вывод.
kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic connect-test --from-beginning
Но я получаю исключение, пока вставка в базу данных ms sql.
Пожалуйста, помогите с этой проблемой. Заранее спасибо.