Kafka Connect - JDB C Source Connector - Настройка схемы Avro - PullRequest
1 голос
/ 03 августа 2020

Как я могу сделать Kafka Connect JDB C коннектором для предопределенной схемы Avro? Он создает новую версию при создании коннектора. Читаю из DB2 и кладу в Kafka topi c. Я задаю имя и версию схемы во время создания, но это не работает !!! Вот мои настройки коннектора:


    {
      "name": "kafka-connect-jdbc-db2-tst-2",
      "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "tasks.max": "1",
        "connection.url": "jdbc:db2://mydb2:50000/testdb",
        "connection.user": "DB2INST1",
        "connection.password": "12345678",
        "query":"SELECT CORRELATION_ID FROM TEST.MYVIEW4 ",
        "mode": "incrementing",
        "incrementing.column.name": "CORRELATION_ID",
        "validate.non.null": "false",
        "topic.prefix": "tst-4" ,
        <B>"auto.register.schemas": "false",
        "use.latest.version": "true",</B>
        "transforms": "RenameField,SetSchemaMetadata",
        "transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
        "transforms.RenameField.renames": "CORRELATION_ID:id",
     <B>   "transforms.SetSchemaMetadata.type": "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
        "transforms.SetSchemaMetadata.schema.name": "foo.bar.MyMessage",
        "transforms.SetSchemaMetadata.schema.version": "1"
    </B>
      }
    
    }

А вот схемы: V.1 мой, а V.2 создается JDB C исходный коннектор:


    $ curl localhost:8081/subjects/tst-4-value/versions/1 | jq .
    
    {
      "subject": "tst-4-value",
      "version": 1,
      "id": 387,
      "schema": "{"type":"record","name":"MyMessage",
    "namespace":"foo.bar","fields":[{"name":"id","type":"int"}]}"
    }
    
    $ curl localhost:8081/subjects/tst-4-value/versions/2 | jq .
    {
      "subject": "tst-4-value",
      "version": 2,
      "id": 386,
      "schema": "{"type":"record","name":"MyMessage","namespace":"foo.bar",
       "fields":[{"name":"id","type":"int"}],
       "connect.version":1,
       "connect.name":"foo.bar.MyMessage"
    }"
    }

Есть идеи, как заставить соединитель Kafka использовать мою схему? Заранее спасибо,

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...