основанный на запросе JDBC Source коннектор Kafka - PullRequest
1 голос
/ 02 июля 2019

У меня есть устаревшая база данных, в которой столбец первичного ключа является строкой (да, я знаю). Я хочу сделать режим дампа increment из базы данных postgres в темы kafka, используя JDBC kafka Source Connector

Ниже приведена моя попытка воссоздать проблему

create table test(
id varchar(20) primary key,
name varchar(10) 
);

INSERT INTO test(
    id, name)
VALUES ('1ab', 't'),
('2ab', 't'),
('3ab', 't')

Мой конфиг

{"name" : "test_connector",
    "config" : {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "connection.url": "jdbc:postgresql://12.34.5.6:5432/",
        "connection.user": "user",
        "connection.password": "password",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "topic.prefix": "incre_",
        "mode": "incrementing",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "query" :"SELECT cast(replace(id, 'ab','') as integer) as id , name from test ORDER BY id ASC",
        "incrementing.column.name":"id",
        "value.converter.schema.registry.url": "http://schema-registry_url.com",
        "key.converter.schema.registry.url": "http://schema-registry_url.com",
        "offset.flush.timeout.ms": 2000,

    }
}

После того, как я опубликовал конфигурацию, статус был RUNNING, когда я делал HTTP-скручивание. Там также нет журнала ошибок в журнале работника, когда я его проверил Там также нет данных в теме кафки, когда я пытался сделать консоль-потребителя Я также попробовал несколько других комбинаций, таких как добавление в "table.whitelist": "test".

Еще одна вещь, которую я попробовал, - переход по этим двум ссылкам. https://rmoff.net/2018/05/21/kafka-connect-and-oracle-data-types/ https://www.confluent.io/blog/kafka-connect-deep-dive-jdbc-source-connector но ничто не поможет, даже умный трюк, который был предложен как SELECT * from (SELECT id, name from test where ...)

Ответы [ 2 ]

0 голосов
/ 18 июля 2019

Так что через несколько часов играешь с другой конфигурацией. Я возвращаюсь к официальному документу и понял это

Используйте пользовательский запрос вместо загрузки таблиц, что позволяет объединять данные из нескольких таблиц. Пока запрос не включает собственную фильтрацию, вы все равно можете использовать встроенные режимы для инкрементных запросов (в этом случае, используя столбец отметки времени). Обратите внимание, что это ограничивает вас одним выходом для каждого соединителя, и поскольку имя таблицы отсутствует, в данном случае тема «префикс» фактически является полным именем темы.

Итак, ключ в том, что "topic.prefix": "incre_test"

Следуйте предыдущим настройкам, правильная конфигурация должна быть

{"name" : "test_connector",
    "config" : {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "connection.url": "jdbc:postgresql://12.34.5.6:5432/",
        "connection.user": "user",
        "connection.password": "password",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "topic.prefix": "incre_test",
        "mode": "incrementing",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "query" :"SELECT cast(replace(id, 'ab','') as integer) as id , name from test ORDER BY id ASC",
        "incrementing.column.name":"id",
        "value.converter.schema.registry.url": "http://schema-registry_url.com",
        "key.converter.schema.registry.url": "http://schema-registry_url.com",
        "offset.flush.timeout.ms": 2000,

    }
}
0 голосов
/ 02 июля 2019

Боюсь, вы не можете использовать свой varchar id в режиме incrementing, поскольку он не является увеличивающимся столбцом / типом. Согласно Confluent Docs ,

Увеличивающийся столбец: Один столбец, содержащий уникальный идентификатор для каждой строки, где более новые строки гарантированно будут иметь более высокие идентификаторы, т.е. AUTOINCREMENT столбец. Обратите внимание, что этот режим может обнаруживать только новые строки. Обновления существующих строк не могут быть обнаружены, поэтому этот режим должен только быть использованы для неизменных данных. Один из примеров, где вы можете использовать этот режим при потоковой передаче таблиц фактов в хранилище данных, так как они обычно только для вставки.

...