У меня есть устаревшая база данных, в которой столбец первичного ключа является строкой (да, я знаю). Я хочу сделать режим дампа increment
из базы данных postgres в темы kafka, используя JDBC kafka Source Connector
Ниже приведена моя попытка воссоздать проблему
create table test(
id varchar(20) primary key,
name varchar(10)
);
INSERT INTO test(
id, name)
VALUES ('1ab', 't'),
('2ab', 't'),
('3ab', 't')
Мой конфиг
{"name" : "test_connector",
"config" : {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://12.34.5.6:5432/",
"connection.user": "user",
"connection.password": "password",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"topic.prefix": "incre_",
"mode": "incrementing",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"query" :"SELECT cast(replace(id, 'ab','') as integer) as id , name from test ORDER BY id ASC",
"incrementing.column.name":"id",
"value.converter.schema.registry.url": "http://schema-registry_url.com",
"key.converter.schema.registry.url": "http://schema-registry_url.com",
"offset.flush.timeout.ms": 2000,
}
}
После того, как я опубликовал конфигурацию, статус был RUNNING
, когда я делал HTTP-скручивание. Там также нет журнала ошибок в журнале работника, когда я его проверил
Там также нет данных в теме кафки, когда я пытался сделать консоль-потребителя
Я также попробовал несколько других комбинаций, таких как добавление в "table.whitelist": "test"
.
Еще одна вещь, которую я попробовал, - переход по этим двум ссылкам.
https://rmoff.net/2018/05/21/kafka-connect-and-oracle-data-types/
https://www.confluent.io/blog/kafka-connect-deep-dive-jdbc-source-connector но ничто не поможет, даже умный трюк, который был предложен как SELECT * from (SELECT id, name from test where ...)