Kafka Connect - Не удалось очистить, истекло время ожидания / Не удалось зафиксировать смещения - PullRequest
0 голосов
/ 14 сентября 2018

Я получаю следующую ошибку:

"ОШИБКА WorkerSourceTask (id = test-mysql-dbc-source-0) Не удалось сбросить, истекло время ожидания ожидания производителя для сброса ожидающих сообщений N.

ОШИБКА Не удалось зафиксировать смещения. (Org.apache.kafka.connect.runtime.WorkerSourceTask: XXX) "

Настройка:

У меня естьЭкземпляр EC2 в AWS (t2.medium - 2 ядра 4 ГБ ОЗУ), который служит Kafka Server.Другой экземпляр EC2 имеет базу данных MySQL для песочницы и Kafka Connect с Confluent JDBC Source Connector.Скрипт Python вставляет пару строк в таблицу случайным образом, чтобы имитировать некоторую активность.На моем ноутбуке я открыл Kafka Console Consumer для чтения сообщений с Kafka Server.

Порты 22, 3306, 9092, 2888 открыты для всех IP-адресов в обоих экземплярах EC2.

Ниже приведены настройкифайлы, которые я использовал для Kafka Connect Source

Файлы конфигурации:

connect-standalone.properties

bootstrap.servers=FIRST_EC2_PUBLIC_IP:9092
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
acks=1
compression.type=lz4
plugin.path=/usr/share/java

jdbc_source.properties

name=test-mysql-jdbc-source
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://localhost:3306/DB_NAME
connection.user=root
connection.password=DB_PASSWORD
table.whitelist=TEST_TABLE  
mode=timestamp+incrementing
incrementing.column.name=ID_RECORD
timestamp.column.name=TIMESTMP
topic.prefix=mysql-source-
acks=1
compression.type=lz4

Я пытался манипулировать различными настройками и параметрами.В основном я пытался поиграться с опциями offset.flush.timeout.ms и buffer.memory, как рекомендовано в этой ссылке .

Поведение производителя:

В общем, после запуска производителя на моем втором экземпляре EC2 я вижу сообщения на своем ноутбуке в консоли kafka для потребителей, так что это работает.Я вижу новые записи по мере их появления.Однако очень часто, когда создается новая строка в таблице, производитель просто не отправляет ее на сервер kafka (первый экземпляр EC2), выдавая вышеупомянутую ошибку в течение 5-20 минут.Количество выдающихся сообщений не становится большим.Большую часть времени это 2-6 сообщений.После выдачи ошибки в течение 5-20 минут он, наконец, отправляет данные, и потребитель консоли потребляет эти данные и некоторое время нормально работает, и после этого снова появляется ошибка.

Если я вручную остановлю производителя и запуском егоопять же, ожидающие сообщения мгновенно вспыхивают и могут быть видны в консоли пользователя на моем ноутбуке.

Не могли бы вы указать мне, где может быть проблема, и что может вызвать такое поведение.

...