Кафка коннект можно ли использовать пользовательский запрос в режиме навалом? - PullRequest
1 голос
/ 18 октября 2019

Я пытаюсь отправить запись для каждой строки 7 дней. Это конфигурация, над которой я работал, но она не работает, даже если запрос создает записи на сервере БД.

{
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "tasks.max": 1,
    "mode": "bulk",
    "connection.url": "jdbc:mysql://mysql:3300/test_db?user=root&password=password",
    "query": "SELECT * FROM test_table WHERE DATEDIFF(CURDATE(), test_table.modified) = 7;",
    "topic.prefix": "test-jdbc-",
    "poll.interval.ms": 10000
}

1 Ответ

1 голос
/ 18 октября 2019

Соединитель источника JDBC импортирует данные из реляционной базы данных в раздел Apache Kafka с помощью драйвера JDBC. Данные загружаются периодически либо с приращением на основе временной метки, либо с массовой загрузкой. Изначально, несмотря на увеличение или увеличение режима при создании JDBC-коннектора, он загружает все данные в тему, после чего он загружает только новые или измененные строки в столбце метки времени.

Bulk: Этот режим не фильтруется иследовательно, не инкрементный вообще. Он будет загружать все строки из таблицы на каждой итерации. Это может быть полезно, если вы хотите периодически выгружать всю таблицу, где записи в конечном итоге удаляются, а последующая система может безопасно обрабатывать дубликаты. Это означает, что вы не можете загружать последние 7 дней с использованием массового режима

Столбец отметки времени : В этом режиме для отслеживания времени последней обработки данных используется один столбец, содержащий отметку времени изменения. и запрашивать только те строки, которые были изменены с того времени. Здесь вы можете загрузить дополнительные данные. Но как это работает при первом создании, он загружает все данные, доступные в таблице базы данных, потому что для коннектора JDBC это новые данные. Позже он будет загружать только новые или измененные данные.

Теперь, согласно вашему требованию, вы пытаетесь загрузить все данные через некоторый интервал времени, который настроен на "poll.interval.ms": 10000. Я вижу вашиНастройка соединения JDBC соответствует определению, тогда как запрос может не работать, попробуйте использовать запрос, как показано ниже. Выглядит запрос переноса JDBC-коннектора в виде таблицы, которая не работает, если вы добавляете регистр.

"query": "select * from (select * from test_table where  modified > now() - interval '7' day) o",

Попробуйте установить ниже

{
  "name": "application_name",
  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
  "key.converter": "org.apache.kafka.connect.json.JsonConverter",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "connection.url": "jdbc:mysql://mysql:3300/test_db",
  "connection.user": "root",
  "connection.password": "password",
  "connection.attempts": "1",
  "mode": "bulk",
  "validate.non.null": false,
  "query": "select * from (select * from test_table where  modified > now() - interval '7' day) o",
  "table.types": "TABLE",
  "topic.prefix": "test-jdbc-",
 "poll.interval.ms": 10000
  "schema.ignore": true,
  "key.converter.schemas.enable": "false",
  "value.converter.schemas.enable": "false"

}
...