Сбой режима метки времени соединителя источника JDBC Kafka для sqlite3 - PullRequest
0 голосов
/ 04 февраля 2019

Я попытался создать базу данных с двумя таблицами в sqlite.Однажды в моей таблице есть столбец метки времени.Я пытаюсь реализовать режим отметки времени для захвата постепенных изменений в БД.Соединение Kafka завершается с ошибкой ниже:

 ERROR Failed to get current time from DB using Sqlite and query 'SELECT 
CURRENT_TIMESTAMP' 
(io.confluent.connect.jdbc.dialect.SqliteDatabaseDialect:471)
java.sql.SQLException: Error parsing time stamp

Caused by: java.text.ParseException: Unparseable date: "2019-02-05 02:05:29" 
does not match (\p{Nd}++)\Q-\E(\p{Nd}++)\Q-\E(\p{Nd}++)\Q 
\E(\p{Nd}++)\Q:\E(\p{Nd}++)\Q:\E(\p{Nd}++)\Q.\E(\p{Nd}++)

Большое спасибо за помощь

Конфиг:

name=test-query-sqlite-jdbc-autoincrement 
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector 
tasks.max=1 
connection.url=jdbc:sqlite:employee.db 
query=SELECT users.id, users.name, transactions.timestamp, transactions.payment_type FROM users JOIN transactions ON (users.id = transactions.user_id) 
mode=timestamp 
timestamp.column.name=timestamp 
topic.prefix=test-joined

DDL:

CREATE TABLE transactions(id integer primary key not null,
                          payment_type text not null,
                          timestamp DATETIME DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')),
                          user_id int not null, 
                          constraint fk foreign key(user_id) references users(id)
); 

CREATE TABLE users (id integer primary key not null,name text not null);

Ответы [ 2 ]

0 голосов
/ 05 июля 2019

Соединитель kafka connect jdbc легко обнаруживает изменения в отметке времени, если значения столбца «отметка времени» имеют формат «отметки времени UNIX».

sqlite> CREATE TABLE transact(timestamp TIMESTAMP DEFAULT (STRFTIME('%s', 'now')) not null,
   ...> id integer primary key not null,
   ...> payment_type text not null);
sqlite>

Значения могут быть вставленыкак:

sqlite> INSERT INTO transact(timestamp,payment_type,id) VALUES (STRFTIME('%s', 'now'),'cash',1);

Изменения, связанные с меткой времени, затем обнаруживаются разъемом источника kafka jdbc, и то же самое можно использовать следующим образом:

kafka-console-consumer  --bootstrap-server localhost:9092 --topic jdbc-transact --from-beginning
{"timestamp":1562321516,"id":2,"payment_type":"card"}
{"timestamp":1562321790,"id":1,"payment_type":"online"}
0 голосов
/ 05 февраля 2019

Я воспроизвел это, и оно уже зарегистрировано как проблема для соединителя источника JDBC.Вы можете следить за этим здесь: https://github.com/confluentinc/kafka-connect-jdbc/issues/219

...