TimeoutException при отправке сообщений в тему кафки - PullRequest
2 голосов
/ 27 мая 2019

У меня проблема с созданием сообщений в теме Кафки.

При создании нескольких сообщений в теме Кафки периодически происходит сбой с исключением:

org.apache.kafka.common.errors.TimeoutException: Срок истечения 6 записей для некоторой темы-1: 30056 мс прошло с момента создания пакета плюс время ожидания

Проблема, кажется, отличается от других, которые яуже найден здесь.Это происходит с очень низкой нагрузкой.Я уверен, что я не превышаю размер буфера (значение по умолчанию).linger.ms настройка установлена ​​на 0 (по умолчанию).Для request.timeout.ms также установлено значение по умолчанию, равное 30000.

На самом деле мой вопрос: есть ли что-нибудь, что могло бы задержать отправку процесса сообщения kafka в течение 30 секунд между вызовом send() метода и его фактической отправкой?Я ищу все, что может сделать ProducerBatch время жизни более 30 секунд при очень низкой нагрузке (или без нагрузки).

Я использую управляемую услугу Kafka от внешнего провайдера, поэтому я спросил его о статусе брокераи он сказал, что все в порядке.Кстати, это происходит в трех разных случаях Кафки.Версия клиента Kafka тоже не имеет значения - это происходит как для 0.11.0.0, так и для 2.0.1.

1 Ответ

1 голос
/ 28 мая 2019

Я только что посмотрел документацию Kafka для версии 2.0.X и скажу, что вы сталкиваетесь с этой ошибкой из-за настроек по умолчанию и низкой загрузки сообщений.

Давайте попробуем разобрать ваше сообщение об ошибке:

org.apache.kafka.common.errors.TimeoutException: Срок действия истекает 6 записи for some-topic-1: 30056 мс прошло с момента создания пакета плюс задержка время

  1. Первый признак ошибки: 30056 мс прошло - Конфигурация request.timeout.ms разыгрывается здесь. Значение по умолчанию для этой конфигурации составляет 30000, то есть 30 секунд. Это 30-секундное ожидание для заполнения буфера на стороне источника - я объяснил это в следующем пункте.
  2. Второй ключ в сообщении об ошибке: с момента создания пакета - При вызове метода send() - сообщение буферизуется на стороне источника и будет ждать 30 секунд ( как указано в первом свойстве config) для заполнения пакета. Теперь batch.size имеет значение по умолчанию 16384 байта, т.е. 16 КБ, поэтому, если загрузка загрузки сообщений низкая (как в вашем случае), и буфер не заполняется большим количеством сообщений размером до 16 КБ в 30 секунд - вы ожидаете это сообщение об ошибке. Итак, в вашем случае кажется, что запись Expiring 6 на самом деле не составляет 16 КБ, необходимых для заполнения пакета.
  3. Наконец, в ошибке также упоминается плюс время задержки - это конфигурация linger.ms, которая находит применение в ситуациях, когда нагрузка при приеме пищи высока и производитель хочет ограничить send() звонки брокеру (ам) Kafka. Это продолжительность, которую Производитель будет ожидать, прежде чем отправлять сообщения посреднику после того, как пакет будет готов (т. Е. Буфер заполнен до batch.size). Значение по умолчанию для этого равно 0, поэтому в вашем случае производитель будет ждать 0 миллисекунд после того, как пакет будет готов. Но ваш буфер на самом деле не заполняется в данное время.

Теперь, чтобы ответить на ваш вопрос : Вы можете увеличить linger.ms, чтобы иметь возможность задерживать сообщения дольше после того, как пакет будет готов. Если вам нужно больше времени для заполнения партии, вам нужно увеличить request.timeout.ms. Вы могли бы также попробовать комбинацию обоих.

В аналогичных строках, чтобы исправить эту ошибку, вы можете увеличить request.timeout.ms или уменьшить batch.size или, возможно, оба.

Надеюсь, это поможет!

...