Кафка Производитель TimeOutException - PullRequest
0 голосов
/ 09 ноября 2018

Я выполняю потоковое задание Samza, которое записывает данные в тему Кафки. Кафка работает с 3-х узловым кластером. Самза работа развернута на пряжу. Мы видим множество этих исключений в журналах контейнеров:

 INFO [2018-10-16 11:14:19,410] [U:2,151,F:455,T:2,606,M:2,658] samza.container.ContainerHeartbeatMonitor:[ContainerHeartbeatMonitor:stop:61] - [main] - Stopping ContainerHeartbeatMonitor
ERROR [2018-10-16 11:14:19,410] [U:2,151,F:455,T:2,606,M:2,658] samza.runtime.LocalContainerRunner:[LocalContainerRunner:run:107] - [main] - Container stopped with Exception. Exiting process now.
org.apache.samza.SamzaException: org.apache.samza.SamzaException: Unable to send message from TaskName-Partition 15 to system kafka.
        at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:147)
        at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:694)
        at org.apache.samza.runtime.LocalContainerRunner.run(LocalContainerRunner.java:104)
        at org.apache.samza.runtime.LocalContainerRunner.main(LocalContainerRunner.java:149)
Caused by: org.apache.samza.SamzaException: Unable to send message from TaskName-Partition 15 to system kafka.
        at org.apache.samza.system.kafka.KafkaSystemProducer$$anon$1.onCompletion(KafkaSystemProducer.scala:181)
        at org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:109)
        at org.apache.kafka.clients.producer.internals.RecordBatch.maybeExpire(RecordBatch.java:160)
        at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortExpiredBatches(RecordAccumulator.java:245)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:212)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:135)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 5 record(s) for Topic3-16 due to 30332 ms has passed since last attempt plus backoff time

Эти 3 типа исключений встречаются очень часто.

59088 org.apache.kafka.common.errors.TimeoutException: Expiring 115 record(s) for Topic3-1 due to 30028 ms has passed since last attempt plus backoff time

61015 org.apache.kafka.common.errors.TimeoutException: Expiring 60 record(s) for Topic3-1 due to 74949 ms has passed since batch creation plus linger time

62275 org.apache.kafka.common.errors.TimeoutException: Expiring 176 record(s) for Topic3-4 due to 74917 ms has passed since last append

Пожалуйста, помогите мне понять, в чем здесь проблема. Всякий раз, когда это произошло, контейнер Samza перезапускается.

1 Ответ

0 голосов
/ 09 ноября 2018

Ошибка указывает, что некоторые записи помещаются в очередь с большей скоростью, чем они могут быть отправлены с клиента.

Когда ваш источник отправляет сообщения, они сохраняются в буфере (перед отправкой целевому посреднику), а записи группируются в пакеты для увеличения пропускной способности. Когда новая запись добавляется в пакет, она должна быть отправлена ​​в пределах окна -configurable- time, которое управляется request.timeout.ms (по умолчанию установлено значение 30 секунд). Если пакет находится в очереди в течение более длительного времени, выдается TimeoutException, и записи пакета затем удаляются из очереди и не доставляются посреднику.

Увеличение значения request.timeout.ms должно помочь вам.

В случае, если это не работает, вы также можете попробовать уменьшить batch.size, чтобы пакеты отправлялись чаще (но на этот раз будет меньше сообщений), и убедитесь, что для linger.ms установлено значение 0 (по умолчанию) значение).

Обратите внимание, что вам нужно перезапустить брокеров kafka после изменения любого параметра конфигурации.

Если вы все еще получаете сообщение об ошибке, я предполагаю, что с вашей сетью что-то не так. Вы включили SSL?

...