Как я могу использовать временную метку или временную метку + инкрементный режим для разъема источника подключения kafka? - PullRequest
1 голос
/ 10 мая 2019

У меня есть отношение к базе данных (Mariadb) со столбцом, «измененным» как «bigint (10)», который представляет метку времени, я полагаю, в формате времени unix. Когда я пытаюсь запустить соединитель источника kafka с режимом «отметка времени» или «отметка времени + увеличение», в тему не добавляются события. Если я запускаю только инкремент, новые записи помещаются в тему. Может мне кто-нибудь намекнуть, где я неправильно настроил разъем? Или разъем не распознает метки времени в формате unix time?

Я попытался запустить соединитель (поиск основан только на отметке времени) со следующими свойствами:

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
        "name":"only_ts",
        "config": {
            "numeric.mapping": "best_fit",
            "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
            "connection.url": "jdbc:mysql://mariadb/moodle",
            "connection.user": "user",
            "connection.password": "",
            "topic.prefix": "only_ts_",
            "mode": "timestamp", 
            "timestamp.column.name":"modified", 
            "table.whitelist":"mdl_forum_posts",
            "poll.intervals.ms": 10000
        }
  }'

Я ожидал бы увидеть записи из "mdl_forum_posts", которые будут помещены в тему kafka "only_ts_mdl_forum_posts" всякий раз, когда я создаю запись или обновляю запись. Однако с помощью этого разъема ничего не происходит. Если я использую только режим «приращения», это работает нормально и как ожидалось. Но чтобы получить БД ОБНОВЛЕНИЯ, мне нужно добавить метку времени режима.

Вывод для "description mdl_forum_posts"

+---------------+--------------+------+-----+---------+----------------+

| Field         | Type         | Null | Key | Default | Extra          |

+---------------+--------------+------+-----+---------+----------------+

| id            | bigint(10)   | NO   | PRI | NULL    | auto_increment |

| discussion    | bigint(10)   | NO   | MUL | 0       |                |

| parent        | bigint(10)   | NO   | MUL | 0       |                |

| userid        | bigint(10)   | NO   | MUL | 0       |                |

| created       | bigint(10)   | NO   | MUL | 0       |                |

| modified      | bigint(10)   | NO   |     | 0       |                |

| mailed        | tinyint(2)   | NO   | MUL | 0       |                |

| subject       | varchar(255) | NO   |     |         |                |

| message       | longtext     | NO   |     | NULL    |                |

| messageformat | tinyint(2)   | NO   |     | 0       |                |

| messagetrust  | tinyint(2)   | NO   |     | 0       |                |

| attachment    | varchar(100) | NO   |     |         |                |

| totalscore    | smallint(4)  | NO   |     | 0       |                |

| mailnow       | bigint(10)   | NO   |     | 0       |                |

| deleted       | tinyint(1)   | NO   |     | 0       |                |

+---------------+--------------+------+-----+---------+----------------+

И вывод для "show create table moodle.mdl_forum_posts;":

| mdl_forum_posts | CREATE TABLE mdl_forum_posts (

  id bigint(10) NOT NULL AUTO_INCREMENT,

  discussion bigint(10) NOT NULL DEFAULT '0',

  parent bigint(10) NOT NULL DEFAULT '0',

  userid bigint(10) NOT NULL DEFAULT '0',

  created bigint(10) NOT NULL DEFAULT '0',

  modified bigint(10) NOT NULL DEFAULT '0',

  mailed tinyint(2) NOT NULL DEFAULT '0',

  subject varchar(255) NOT NULL DEFAULT '',

  message longtext NOT NULL,

  messageformat tinyint(2) NOT NULL DEFAULT '0',

  messagetrust tinyint(2) NOT NULL DEFAULT '0',

  attachment varchar(100) NOT NULL DEFAULT '',

  totalscore smallint(4) NOT NULL DEFAULT '0',

  mailnow bigint(10) NOT NULL DEFAULT '0',

  deleted tinyint(1) NOT NULL DEFAULT '0',

  PRIMARY KEY (id),

  KEY mdl_forupost_use_ix (userid),

  KEY mdl_forupost_cre_ix (created),

  KEY mdl_forupost_mai_ix (mailed),

  KEY mdl_forupost_dis_ix (discussion),

  KEY mdl_forupost_par_ix (parent)

) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8 COMMENT='All posts are stored in this table' |

Пример записи в столбце «изменен»:

select modified from mdl_forum_posts;
1557487199

Это метка времени в Unix, как, кажется, показывает следующее:

select from_unixtime(modified) from mdl_forum_posts;
2019-05-10 11:19:59

Похоже, что в соответствующем журнале, касающемся соответствующего соединителя (только отметка времени), есть какой-то запрос?

kafka-connect_1    | [2019-05-10 11:48:47,434] DEBUG TimestampIncrementingTableQuerier{table="moodle"."mdl_forum_posts", query='null', topicPrefix='only_ts_', incrementingColumn='', timestampColumns=[modified]} prepared SQL query: SELECT * FROM `moodle`.`mdl_forum_posts` WHERE `moodle`.`mdl_forum_posts`.`modified` > ? AND `moodle`.`mdl_forum_posts`.`modified` < ? ORDER BY `moodle`.`mdl_forum_posts`.`modified` ASC (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier)
kafka-connect_1    | [2019-05-10 11:48:47,435] DEBUG Resetting querier TimestampIncrementingTableQuerier{table="moodle"."mdl_forum_posts", query='null', topicPrefix='only_ts_', incrementingColumn='', timestampColumns=[modified]} (io.confluent.connect.jdbc.source.JdbcSourceTask)
...