kafka-connect-jdbc не получает последовательную метку времени из источника - PullRequest
0 голосов
/ 14 января 2019

Я использую kafka-connect-jdbc-4.0.0.jar и postgresql-9.4-1206-jdbc41.jar

конфигурация разъема kafka connect

{
  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
  "mode": "timestamp",
  "timestamp.column.name": "updated_at",
  "topic.prefix": "streaming.data.v2",
  "connection.password": "password",
  "connection.user": "user",
  "schema.pattern": "test",
  "query": "select * from view_source",
  "connection.url": "jdbc:postgresql://host:5432/test?currentSchema=test"
}

Я настроил два соединителя, один источник и другой приемник, используя драйвер jdbc, для базы данных postgresql («PostgreSQL 9.6.9») все работает правильно

У меня есть сомнения в том, как соединитель собирает исходные данные, глядя на журнал, я вижу, что между выполнением запросов есть разница во времени 21 секунда

11/1/2019 9:20:18[2019-01-11 08:20:18,985] DEBUG Checking for next block of results from TimestampIncrementingTableQuerier{name='null', query='select * from view_source', topicPrefix='streaming.data.v2', timestampColumn='updated_at', incrementingColumn='null'} (io.confluent.connect.jdbc.source.JdbcSourceTask)
11/1/2019 9:20:18[2019-01-11 08:20:18,985] DEBUG TimestampIncrementingTableQuerier{name='null', query='select * from view_source', topicPrefix='streaming.data.v2', timestampColumn='updated_at', incrementingColumn='null'} prepared SQL query: select * from view_source WHERE "updated_at" > ? AND "updated_at" < ? ORDER BY "updated_at" ASC (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier)
11/1/2019 9:20:18[2019-01-11 08:20:18,985] DEBUG executing query select CURRENT_TIMESTAMP; to get current time from database (io.confluent.connect.jdbc.util.JdbcUtils)
11/1/2019 9:20:18[2019-01-11 08:20:18,985] DEBUG Executing prepared statement with timestamp value = 2019-01-11 08:17:07.000 end time = 2019-01-11 08:20:18.985 (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier)
11/1/2019 9:20:19[2019-01-11 08:20:19,070] DEBUG Resetting querier TimestampIncrementingTableQuerier{name='null', query='select * from view_source', topicPrefix='streaming.data.v2', timestampColumn='updated_at', incrementingColumn='null'} (io.confluent.connect.jdbc.source.JdbcSourceTask)

11/1/2019 9:20:49[2019-01-11 08:20:49,499] DEBUG Checking for next block of results from TimestampIncrementingTableQuerier{name='null', query='select * from view_source', topicPrefix='streaming.data.v2', timestampColumn='updated_at', incrementingColumn='null'} (io.confluent.connect.jdbc.source.JdbcSourceTask)
11/1/2019 9:20:49[2019-01-11 08:20:49,500] DEBUG TimestampIncrementingTableQuerier{name='null', query='select * from view_source', topicPrefix='streaming.data.v2', timestampColumn='updated_at', incrementingColumn='null'} prepared SQL query: select * from view_source WHERE "updated_at" > ? AND "updated_at" < ? ORDER BY "updated_at" ASC (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier)
11/1/2019 9:20:49[2019-01-11 08:20:49,500] DEBUG executing query select CURRENT_TIMESTAMP; to get current time from database (io.confluent.connect.jdbc.util.JdbcUtils)
11/1/2019 9:20:49[2019-01-11 08:20:49,500] DEBUG Executing prepared statement with timestamp value = 2019-01-11 08:20:39.000 end time = 2019-01-11 08:20:49.500 (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier)

первый запрос собирает данные между 08: 17: 07.000 и 08: 20: 18.985, но второй собирает данные между 08: 20: 39.000 и 08: 20: 49.500 .. между ними существует разница в 21 секунда, в которой там могут быть записи ...

11/1/2019 9:20:18[2019-01-11 08:20:18,985] DEBUG Executing prepared statement with timestamp value = 2019-01-11 08:17:07.000 end time = 2019-01-11 08:20:18.985 
11/1/2019 9:20:49[2019-01-11 08:20:49,500] DEBUG Executing prepared statement with timestamp value = 2019-01-11 08:20:39.000 end time = 2019-01-11 08:20:49.500 

Я предполагаю, что одна из данных является последней полученной записью, а другая - меткой времени момента

Не могу найти объяснение этому Это нормальная работа разъема? Должны ли вы предполагать, что вы не всегда будете собирать всю информацию?

1 Ответ

0 голосов
/ 14 января 2019

Соединитель JDBC не гарантирует получение каждого сообщения. Для этого вам понадобится регистрация изменений на основе журнала. Для Postgres, предоставляемых Debezium и Kafka Connect. Вы можете прочитать больше об этом здесь .

Отказ от ответственности: я работаю на Confluent, и написал вышеупомянутый блог

...