Как добавить явное предложение WHERE в соединитель источника JDBC Kafka Connect - PullRequest
1 голос
/ 17 июня 2019

Я использую kafka connect для получения исходных данных из DB2 в теме kafka, и я настраиваю sql-запрос для чтения данных из DB2, ниже приведен запрос

SELECT SEQ_I AS error_id, TRIM(SEND_I) AS sca , to_char(CREATE_TS,'YYYY-MM-DD HH24:MI:SS.FF3') AS create_timestamp, CREATE_TS, TRIM(ERR_MSG) AS error_message , CASE substr(ERR_MSG,1,locate('-',ERR_MSG)-1) WHEN 'WARNING' THEN 'W' WHEN 'SUSPENDED' THEN 'F' END ERROR_TYPE FROM INTCHG_ERROR_DIR WHERE TRAN_I ='503' AND PRCS_N = 'GLOBAL'

, использующий параметр "timestamp.column.name": "CREATE_TS", здесь проблема заключается вв запросе это уже WHERE предложение, и kafka connect попытался добавить еще одно предложение where со столбцом timestamp, и это создает проблему, и еще одна проблема заключается в том, если я удалю предложение where из предложения sql, как показано ниже

SELECT SEQ_I AS error_id, TRIM(SEND_I) AS sca , to_char(CREATE_TS,'YYYY-MM-DD HH24:MI:SS.FF3') AS create_timestamp, CREATE_TS, TRIM(ERR_MSG) AS error_message , CASE substr(ERR_MSG,1,locate('-',ERR_MSG)-1) WHEN 'WARNING' THEN 'W' WHEN 'SUSPENDED' THEN 'F' END ERROR_TYPE FROM INTCHG_ERROR_DIR

тогда получаю ошибку с substr, как показано ниже

SQL Error [22011]: THE SECOND OR THIRD ARGUMENT OF THE SUBSTR OR SUBSTRING FUNCTION IS OUT OF RANGE. SQLCODE=-138, SQLSTATE=22011, DRIVER=4.19.26

Может кто-нибудь предложить по обеим этим вопросам, я застрял на этом этапе.

1 Ответ

1 голос
/ 17 июня 2019

Это происходит потому, что вы пытаетесь использовать "mode": "timestamp" и query.TimestampIncrementingTableQuerier добавляет к запросу предложение WHERE, которое конфликтует с существующими предложениями WHERE в query.

Документация по исходному соединителю JDBC понятна по этому вопросу:

query

Если указано, запрос для выбора нового илиобновленные строки.Используйте этот параметр, если вы хотите объединить таблицы, выбрать подмножества столбцов в таблице или отфильтровать данные.При использовании этот соединитель будет копировать данные только по этому запросу - копирование всей таблицы будет отключено.Различные режимы запросов все еще могут использоваться для инкрементных обновлений, но для правильного построения инкрементного запроса должна быть возможность добавить к этому запросу предложение WHERE (то есть нельзя использовать предложения WHERE). Если вы используете предложение WHERE, оно само должно обрабатывать инкрементные запросы .

В качестве обходного пути вы можете изменить свой запрос на (в зависимости от используемого SQL-варианта)

SELECT * FROM ( SELECT * FROM table WHERE ...)

или

WITH a AS
   SELECT * FROM b
    WHERE ...
SELECT * FROM a

Например, в вашем случае запрос должен быть

"query":"SELECT * FROM (SELECT SEQ_I AS error_id, TRIM(SEND_I) AS sca , to_char(CREATE_TS,'YYYY-MM-DD HH24:MI:SS.FF3') AS create_timestamp, CREATE_TS, TRIM(ERR_MSG) AS error_message , CASE substr(ERR_MSG,1,locate('-',ERR_MSG)-1) WHEN 'WARNING' THEN 'W' WHEN 'SUSPENDED' THEN 'F' END ERROR_TYPE FROM INTCHG_ERROR_DIR WHERE TRAN_I ='503' AND PRCS_N = 'GLOBAL') o"
...