Kafka Connect - не удалось очистить, истекло время ожидания для производителя, чтобы сбросить ожидающие сообщения - PullRequest
2 голосов
/ 04 апреля 2019

Я пытаюсь использовать исходный соединитель Kafka Connect JDBC со следующими свойствами в режиме BULK.

connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
timestamp.column.name=timestamp
connection.password=XXXXX
validate.non.null=false
tasks.max=1
producer.buffer.memory=2097152
batch.size=1000
producer.enable.idempotence=true
offset.flush.timeout.ms=300000
table.types=TABLE,VIEW
table.whitelist=materials
offset.flush.interval.ms=5000
mode=bulk
topic.prefix=mysql-
connection.user=kafka_connect_user
poll.interval.ms=200000
connection.url=jdbc:mysql://<DBNAME>
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter

Я получаю следующую ошибку при фиксации смещений, изменение различных параметров, похоже, не дает большого эффекта.

[2019-04-04 12:42:14,886] INFO WorkerSourceTask{id=SapMaterialsConnector-0} flushing 4064 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2019-04-04 12:42:19,886] ERROR WorkerSourceTask{id=SapMaterialsConnector-0} Failed to flush, timed out while waiting for producer to flush outstanding 712 messages (org.apache.kafka.connect.runtime.WorkerSourceTask)

1 Ответ

0 голосов
/ 04 апреля 2019

Ошибка указывает на то, что буферизовано много сообщений, и их нельзя сбросить до истечения времени ожидания.Для решения этой проблемы вы можете

  • либо увеличить offset.flush.timeout.ms параметр конфигурации в ваших конфигурациях Kafka Connect Worker
  • , либо уменьшить объем буферизуемых данных, уменьшив значение producer.buffer.memory вваши рабочие настройки Kafka Connect Worker.Это лучший вариант, когда у вас довольно большие сообщения.
...