Может ли тип данных varchar быть меткой времени в Confluent? - PullRequest
0 голосов
/ 20 декабря 2018

Я использую слияние для реализации ETL в реальном времени.Мой источник данных - oracle, в каждой таблице есть столбец с именем ts, тип данных - varchar, но данные в этом столбце имеют формат YYYY-MM - DD HH24: MI: SS.можно использовать этот столбец в качестве метки времени в соединителе кафки?как настроить файл xxxxx.properties?

mode=timestamp
query= select to_date(a.ts,'yyyy-mm-dd hh24:mi:ss') tsinc,a.* from TEST_CORP a
poll.interval.ms=1000 
timestamp.column.name=tsinc

1 Ответ

0 голосов
/ 25 декабря 2018

connector.class = io.confluent.connect.jdbc.JdbcSourceConnector запрос = выберите * из режима NFSN.BD_CORP = timestamp poll.interval.ms = 3000 timestamp.column.name = TS topic.prefix = t_ validate.non.null = false

тогда я получаю эту ошибку:

[2018-12-25 14: 39: 59,756] ИНФОРМАЦИЯ После фильтрации таблицы: (io.confluent.connect.jdbc.source.TableMonitorThread: 175) [2018-12-25 14: 40: 01,383] ОТЛАДКА Проверка следующего блока результатов из TimestampIncrementingTableQuerier {table = null, query = 'select * from NFSN.BD_CORP', topicPrefix = 't_', incrementingColumn = '', timestampColumns = [TS]} (io.confluent.connect.jdbc.source.JdbcSourceTask: 291) [2018-12-25 14: 40: 01,386] DEBUG TimestampIncrementingTableQuier {таблицаnull, query = 'select * from NFSN.BD_CORP', topicPrefix = 't_', incrementingColumn = '', timestampColumns = [TS]} подготовленный SQL-запрос: выберите * из NFSN.BD_CORP ГДЕ "TS">?И "ТС" <?ORDER BY "TS" ASC (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier: 161) [2018-12-25 14: 40: 01,386] DEBUG при выполнении запроса выберите CURRENT_TIMESTAMP из dual, чтобы получить текущее время из базы данных (io.confluent.connect.jdbc.dialect.OracleDatabaseDialect: 462) [2018-12-25 14: 40: 01,388] DEBUG Выполнение подготовленного оператора со значением метки времени = 1970-01-01 00: 00: 00.000 время окончания = 2018-12-25 06: 40: 43.828 (io.confluent.connect.jdbc.source.TimestampIncrementingCriteria: 162) [2018-12-25 14: 40: 01,389] ОШИБКА Не удалось выполнить запрос для таблицы TimestampIncrementingTableQuerier {table = null, query = 'select * fromNFSN.BD_CORP ', topicPrefix =' t_ ', incrementingColumn =' ', timestampColumns = [TS]}: {} (io.confluent.connect.jdbc.source.JdbcSourceTask: 314) java.sql.SQLDataException: ORA-01843:недопустимый месяц </p>

    at oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:447)
    at oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:396)
    at oracle.jdbc.driver.T4C8Oall.processError(T4C8Oall.java:951)
    at oracle.jdbc.driver.T4CTTIfun.receive(T4CTTIfun.java:513)
    at oracle.jdbc.driver.T4CTTIfun.doRPC(T4CTTIfun.java:227)
    at oracle.jdbc.driver.T4C8Oall.doOALL(T4C8Oall.java:531)
    at oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement.java:208)
    at oracle.jdbc.driver.T4CPreparedStatement.executeForDescribe(T4CPreparedStatement.java:886)
    at oracle.jdbc.driver.OracleStatement.executeMaybeDescribe(OracleStatement.java:1175)
    at oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout(OracleStatement.java:1296)
    at oracle.jdbc.driver.OraclePreparedStatement.executeInternal(OraclePreparedStatement.java:3613)
    at oracle.jdbc.driver.OraclePreparedStatement.executeQuery(OraclePreparedStatement.java:3657)
    at oracle.jdbc.driver.OraclePreparedStatementWrapper.executeQuery(OraclePreparedStatementWrapper.java:1495)
    at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.executeQuery(TimestampIncrementingTableQuerier.java:168)
    at io.confluent.connect.jdbc.source.TableQuerier.maybeStartQuery(TableQuerier.java:88)
    at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.maybeStartQuery(TimestampIncrementingTableQuerier.java:60)
    at io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:292)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:244)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:220)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748) [2018-12-25 14:40:01,390] DEBUG Resetting querier

TimestampIncrementingTableQuerier {table = null, query = 'select * from NFSN.BD_CORP', topicPrefix = 't_', incrementingColumn = '', timestampColumns = [TS]} (io.confluent.connect.jdbc.source.JdbcSourceTask: 332) ^ C [2018-12-25 14: 40: 03,826] ИНФОРМАЦИЯ Остановка Kafka Connect (org.apache.kafka.connect.runtime.Connect: 65) [2018-12-25 14: 40: 03,827] ИНФОРМАЦИЯ Остановка сервера REST (org.apache.kafka.connect.runtime.rest.RestServer: 223)

...