Тема Кафки, кажется, функционирует только в первый раз. Почему? - PullRequest
1 голос
/ 12 октября 2019

Я работаю с Kafka Connect (используя реализацию Confluent) и наблюдаю странное поведение. Я настраиваю исходное соединение, чтобы извлечь данные из таблицы БД и заполнить тему. Это работает. Но если я удаляю тему, удаляю конфигурацию источника, а затем сбрасываю конфигурацию (возможно, добавив еще один столбец в запрос), тема не заполняется. Если я изменю название темы на то, что раньше не использовал, это сработает. Я использую Postman для настройки конфигурации, хотя я не думаю, что это имеет значение.

My Connect config:

{
    "name": "my-jdbc-connector",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "connection.url": "jdbc:db2://db2server.mycompany.com:4461/myDB",
        "connection.user: "dbUser",
        "connection.password": "dbPass",
        "dialect.name": "Db2DatabaseDialect",
        "mode": "timestamp",
        "query": "select fname, lname, custId, custRegion, lastUpdate from CustomerMaster",
        "timestamp.column.name": "lastUpdate",
        "table.types": "TABLE",
        "topic.prefix": "master.customer"
    }
}

1 Ответ

0 голосов
/ 13 октября 2019

Соединитель KAFKA JDBC использует HighWatermark для столбца метки времени, т.е. lastUpdate в вашем случае. Это не зависит от темы, или даже вы можете удалить JDBC-соединитель и воссоздать его с тем же именем, он все равно будет использовать тот же HighWatermark, потому что HighWatermark зависит от имени соединителя. Так что даже если вы воссоздаете тему, она не будет загружать данные снова. Таким образом, существует способ повторной обработки целых данных, который вы можете выполнить любым из следующих способов:

  1. Удалить тему и удалить JDBC-коннектор, заново создать тему и создать JDBC-коннектор с другим именем. или

  2. Удалите соединитель JDBC и заново создайте его с тем же именем в режиме «режим»: «объем». Он снова сбросит всю таблицу базы данных в теме. как только он загрузится, вы можете снова обновить режим до отметки времени. Пожалуйста, обратитесь к информации о конфигурации разъема JDBC

https://docs.confluent.io/current/connect/kafka-connect-jdbc/source-connector/source_config_options.html

обновить lastUpdate для всех записей до текущей отметки времени.
...