Confluent: тема не была создана после загрузки соединителя - PullRequest
0 голосов
/ 11 сентября 2018

Я попытался подключиться к нескольким базам данных, включая MySQL и MSSQL, и у меня не было проблем.

Но когда я попытался подключиться к определенной удаленной базе данных MySQL (доступ к которой возможен только из сети моей компании), которая содержит представление, содержащее почти 3 миллиона записей, Connector был загружен, и Status сказал, что он работает, но не было не любая тема, созданная для использования данных из нее.

В чем может быть причина? и где я могу найти правильный файл журнала, чтобы узнать, что случилось?

Connector Status

Available Topics

Вот пример того, как выглядит разъем:

{
  "name": "mysql-source",
   "config": {
   "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
   "key.converter": "io.confluent.connect.avro.AvroConverter",
   "key.converter.schema.registry.url": "http://localhost:8081",
   "value.converter": "io.confluent.connect.avro.AvroConverter",
   "value.converter.schema.registry.url": "http://localhost:8081",
   "incrementing.column.name": "Id",
   "tasks.max": "1",
   "table.types": "VIEW",
   "table.whitelist": "ticket_rep",
   "mode": "incrementing",
   "topic.prefix": "mysql-",
   "name": "mysql-source",
   "validate.non.null": "false",
   "connection.url": "jdbc:mysql://XX.XXX.XX.XX:3306/database? 
    user=user&password=password"
 }

}

Это результаты журнала, когда я запускаю соединение с логом:

> [2018-09-11 16:37:57,382] ERROR Failed to run query for table TimestampIncrementingTableQuerier{name='foo', query='null', topicPrefix='mysql-', timestampColumn='null', incrementingColumn='id'}: {} (io.confluent.connect.jdbc.source.JdbcSourceTask:247)
java.sql.SQLException: Java heap space
    at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:129)
    at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97)
    at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
    at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:975)
    at com.mysql.cj.jdbc.ClientPreparedStatement.executeQuery(ClientPreparedStatement.java:1025)
    at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.executeQuery(TimestampIncrementingTableQuerier.java:201)
    at io.confluent.connect.jdbc.source.TableQuerier.maybeStartQuery(TableQuerier.java:84)
    at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.maybeStartQuery(TimestampIncrementingTableQuerier.java:55)
    at io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:225)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:179)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
[2018-09-11 16:38:02,523] ERROR Failed to run query for table TimestampIncrementingTableQuerier{name='foo', query='null', topicPrefix='mysql-', timestampColumn='null', incrementingColumn='id'}: {} (io.confluent.connect.jdbc.source.JdbcSourceTask:247)
com.mysql.cj.jdbc.exceptions.PacketTooBigException: Packet for query is too large (7,562,612 > 4,194,304). You can change this value on the server by setting the 'max_allowed_packet' variable.
    at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:107)
    at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:975)
    at com.mysql.cj.jdbc.ClientPreparedStatement.executeQuery(ClientPreparedStatement.java:1025)
    at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.executeQuery(TimestampIncrementingTableQuerier.java:201)
    at io.confluent.connect.jdbc.source.TableQuerier.maybeStartQuery(TableQuerier.java:84)
    at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.maybeStartQuery(TimestampIncrementingTableQuerier.java:55)
    at io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:225)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:179)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

1 Ответ

0 голосов
/ 11 сентября 2018

com.mysql.cj.jdbc.exceptions.PacketTooBigException: Packet for query is too large - это проблема на стороне MySQL, которая может быть исправлена ​​путем увеличения значения переменной max_allowed_packet. Для этого нужно включить

max_allowed_packet=512M

в файле my.cnf (или my.ini, в зависимости от используемой ОС) (в разделе [mysqld]), а затем перезапустите MySQL. После перезапуска MySQL,

SHOW VARIABLES LIKE 'max_allowed_packet';

должно возвращать значение, которое вы установили в своем конфигурационном файле MySQL. Для получения более подробной информации об этой ошибке вы можете обратиться к документации MySQL .


java.sql.SQLException: Java heap space, указывает, что для соединения Kafka не хватает места в куче. Можно управлять начальным и максимальным размером кучи, запустив

KAFKA_HEAP_OPTS="-Xms512m -Xmx1g" connect-standalone connect-worker.properties mysql-source-connector.properties

, который устанавливает начальный размер кучи 512 МБ и максимальный размер 1 ГБ. Возможно, вам придется изменить размеры в соответствии с вашими потребностями.

...