Считать файл CSV с помощью kafka-connect и погрузиться в базу данных ms sql? - PullRequest
1 голос
/ 06 января 2020

Я работаю над ПО C, мне нужно прочитать CSV-файл и вставить его на сервер MS sql. Я создал конфигурацию ниже, но я получаю ниже исключения:

) \ nCaused by: org. apache .kafka.connect.errors.ConnectException: Схема значений должна иметь тип Struct

Конфигурация указана ниже:

1. Пример данных CSV-файла:

id,record1,record2,record3,created

1,1772056014794065487,160842,20668578,9999-12-31

2,1772056014794065487,160842,20668578,9999-12-31

3,1772056014794065487,160842,20668578,9999-12-31

4,1772056014794065487,160842,20668578,9999-12-31

5,1772056014794065487,160842,20668578,9999-12-31

2. коннектор файла-источника

{"name":"file-source",

"config":
         {
          "connector.class":"FileStreamSource",
          "tasks.max":"1",
          "file":"/tmp/my-connect-test.dat",
          "topic":"connect-test",
          "name":"file-source"},
          "tasks":[{"connector":"file-source","task":0}],
          "type":"source"}

3. jdb c -разъем для подключения:

{"name":"test-sink",
  "config":   {
         "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
         "tasks.max":"1",
         "topics":"connect-test",
         "topic.prefix":"connect-test", 
         "insert.mode":"insert",
         "table.name.format":"dz.temp_data",
         "pk.mode":"record_value",
         "pk.fields":"id",
         "incrementing.column.name":"id",
         "table.whitelist":"dz.temp_data",
         "mode":"incrementing",
         "key.converter.schemas.enable":"false",
         "value.converter.schemas.enable":"false",
         "key.converter":"io.confluent.connect.avro.AvroConverter",
         "value.converter":"io.confluent.connect.avro.AvroConverter",
         "key.converter.schema.registry.url":"http://localhost:8081",
         "value.converter.schema.registry.url":"http://localhost8081",
         "connection.url":"jdbc:sqlserver://**;databaseName=**;username=**;password=***",
         "name":"test-sink"},
         "tasks":[{"connector":"test-sink","task":0}],
         "type":"sink"}

4. мс sql таблица:

CREATE TABLE dz.temp_data (
  id INTEGER IDENTITY(1,1) NOT NULL PRIMARY KEY,
  record1 VARCHAR(255) NOT NULL,
   record2 VARCHAR(255) NOT NULL,
record3 VARCHAR(255) NOT NULL,
record4 VARCHAR(255) NOT NULL,
created VARCHAR(255) NOT NULL
);

Если я проверяю topi c пользователем avro, я получаю правильный вывод.

kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic connect-test --from-beginning

Но я получаю исключение, пока вставка в базу данных ms sql.

Пожалуйста, помогите с этой проблемой. Заранее спасибо.

1 Ответ

0 голосов
/ 06 января 2020

Если вы используете мойку JDB C, у вас должна быть схема. Файлы CSV не имеют схемы. Таким образом, вам нужно будет определить один по пути.

Чтобы узнать больше о сериализации и схемах, см. здесь .

У вас есть несколько вариантов здесь:

  1. Использовать соединитель, который может применять схему при получении (например, kafka-connect-spooldir ). Если вы сделаете это, убедитесь, что вы используете AvroConverter или JSONConverter с schemas.enable=true для вашего value.converter в исходной конфигурации Kafka Connect. Вы можете увидеть пример этого в действии здесь .

  2. Используйте ksqlDB , чтобы применить вашу схему, а затем повторно выполнить сериализацию данных в другой топи. c в более подходящем формате (например, Avro) и используйте этот topi c для заполнения базы данных. Например:

    -- Declare a schema for the existing topic
    CREATE STREAM SOURCE_DATA (id INT, record1 VARCHAR, record2 VARCHAR, record3 VARCHAR, 
                               record4 VARCHAR, record5 VARCHAR) 
                  WITH (KAFKA_TOPIC='connect-test', VALUE_FORMAT='DELIMITED');
    
    -- Write a new Kafka topic that serialises all the data 
    -- from the first topic to a new one, in Avro
    CREATE STREAM SOURCE_DATA_RESERIALISED WITH (KAFKA_TOPIC='connect-test_avro',
                                                 VALUE_FORMAT='AVRO') AS 
      SELECT * FROM SOURCE_DATA;
    
...