confluent - kafka-connect - соединитель источника JDBC - ORA-00933: команда SQL не завершена должным образом - PullRequest
0 голосов
/ 19 сентября 2018

В моем файле свойств исходного соединителя kafka jdbc есть следующий SQL-запрос:

query=SELECT * FROM JENNY.WORKFLOW where ID = '565231'

Если я выполняю тот же запрос в SQL-разработчике, он работает нормально и извлекает результаты.Но если я использую тот же запрос в «jdbc_workflow_connect.properties», получаю следующую ошибку:

(io.confluent.connect.jdbc.source.JdbcSourceTaskConfig:223)
[2018-09-19 12:32:15,130] INFO WorkerSourceTask{id=Workflow-DB-source-0} 
Source task finished initialization and start 
(org.apache.kafka.connect.runtime.WorkerSourceTask:158)
[2018-09-19 12:32:15,328] ERROR Failed to run query for table 
TimestampIncrementingTableQuerier{name='null', query='SELECT * FROM 
JENNY.WORKFLOW where ID = '565231'', topicPrefix='workflow_data1', 
timestampColumn='null', incrementingColumn='ID'}: {} 
(io.confluent.connect.jdbc.source.JdbcSourceTask:247)
java.sql.SQLSyntaxErrorException: ORA-00933: SQL command not properly ended

at oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:450)
at oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:399)
at oracle.jdbc.driver.T4C8Oall.processError(T4C8Oall.java:1017)
at oracle.jdbc.driver.T4CTTIfun.receive(T4CTTIfun.java:655)
at oracle.jdbc.driver.T4CTTIfun.doRPC(T4CTTIfun.java:249)
at oracle.jdbc.driver.T4C8Oall.doOALL(T4C8Oall.java:566)
at oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement.java:215)
at oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement.java:58)
at oracle.jdbc.driver.T4CPreparedStatement.executeForDescribe(T4CPreparedStatement.java:776)
at oracle.jdbc.driver.OracleStatement.executeMaybeDescribe(OracleStatement.java:897)
at oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout(OracleStatement.java:1034)
at oracle.jdbc.driver.OraclePreparedStatement.executeInternal(OraclePreparedStatement.java:3820)
at oracle.jdbc.driver.OraclePreparedStatement.executeQuery(OraclePreparedStatement.java:3867)
at oracle.jdbc.driver.OraclePreparedStatementWrapper.executeQuery(OraclePreparedStatementWrapper.java:1502)
at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.executeQuery(TimestampIncrementingTableQuerier.java:201)
at io.confluent.connect.jdbc.source.TableQuerier.maybeStartQuery(TableQuerier.java:84)
at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.maybeStartQuery(TimestampIncrementingTableQuerier.java:55)
at io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:225)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:179)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

Вот содержимое моего файла свойств исходного соединителя JDBC:

name=Workflow-DB-source
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
connection.password = ******
connection.url = jdbc:oracle:thin:@1.1.1.1:****/****
connection.user = *****
table.types=TABLE
query=SELECT * FROM JENNY.WORKFLOW where ID = '565231'

mode=incrementing
incrementing.column.name=ID
topic.prefix=workflow_data1
timestamp.delay.interval.ms=60000

transforms:createKey
transforms.createKey.type:org.apache.kafka.connect.transforms.ValueToKey
transforms.createKey.fields:ID

I 'm, используя ojdbc7.jar

Замечание:

Если я удалю предложение "WHERE", запрос будет работать нормально (как показано ниже):

SELECT * FROM JENNY.WORKFLOW

Пожалуйста, дайте мнезнать, что я делаю что-то не так или какие-либо изменения, необходимые для настройки в исходном разъеме jdbc.

Заранее спасибо.

1 Ответ

0 голосов
/ 19 сентября 2018

Из документации Параметры конфигурации JDBC Connect вы можете прочитать

Если указано, запрос выполнять для выбора новых или обновленных строк.Используйте этот параметр, если вы хотите объединить таблицы, выбрать подмножества столбцов в таблице или отфильтровать данные.При использовании этот соединитель будет копировать данные только по этому запросу - копирование всей таблицы будет отключено.Различные режимы запросов все еще могут использоваться для инкрементных обновлений, но для правильного построения инкрементного запроса должна быть возможность добавить к этому запросу предложение WHERE (то есть нельзя использовать предложения WHERE).

Так что, если вы действительно хотите рассмотреть только часть таблицы с данным ID, вы должны заключить запрос в следующую строку

 select * from (SELECT * FROM JENNY.WORKFLOW where ID = '565231')

Но, пожалуйста, убедитесь, что вы проверилидокументация Опции конфигурации и вам известна роль параметра query.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...