Исключение тайм-аута для производителя Kafka: даже при максимальном тайм-ауте запроса и правильном размере партии - PullRequest
1 голос
/ 02 мая 2020

В настоящее время у нас есть около 80 приложений (около 200 реплик K8), ежедневно записывающих 16-17 миллионов записей в kafka, и некоторые из этих записей периодически терпели неудачу с исключениями по времени ожидания и перебалансировке. Частота отказов была менее 0,02%.

Мы проверили и настроили все параметры должным образом, как это было предложено другими ссылками на стек-потоки, и все же мы получаем несколько проблем.

Одна проблема связана с Rebalance , Мы сталкиваемся с проблемами на стороне производителя и потребителя, как с этой проблемой. Для Потребителя мы используем автоматическую фиксацию, и иногда Кафка перебалансирует, а потребитель получает дубликаты записей. мы не ставили дубликат проверки, потому что это снизит скорость обработки, а уровень дублирования записи будет меньше 0,1%. Мы подумываем о том, чтобы перейти к ручному фиксированию и управлению смещениями с использованием базы данных. Но нужно понять с точки зрения брокеров Kafka, почему восстановление баланса происходит ежедневно.

Ошибка источника:

org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.

Вторая проблема связана с Исключение тайм-аута . Это происходит периодически для некоторых приложений. Производитель пытался отправить запись, и она была добавлена ​​в пакет, и она не смогла доставить до истечения времени ожидания запроса, которое мы увеличили до 5 минут. В идеальном случае Кафку следует повторять через определенный интервал. Во время отладки мы обнаружили, что накопитель записей истекает из предыдущего пакета, даже не пытаясь отправить их в случае истечения времени ожидания запроса - это ожидаемое поведение? Можем ли мы в любом случае добавить повтор для этого?

org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for xxTopic-41:300014 ms has passed since batch creation. </b>

Конфигурация: 1. 5 брокеров и 3 зоопарка - Кафка версия 2.2 2. Брокеры работают в Куберне с statefulset. 3. Каждый брокер имеет 32 ГБ и 8 ЦП , как рекомендовано Confluent for Production. 4. Topi c имеет 200 разделов и 8 потребителей копий. 5. Каждый потребитель обрабатывает только около 25-30 потоков. Потребитель имеет емкость 4 ГБ и 4 ЦПУ.


@Value("${request.timeout:300000}") <br/>
private String requestTimeOut;

@Value("${batch.size:8192}") <br/>
private String batchSize;

@Value("${retries:5}") <br/>
private Integer kafkaRetries;

@Value("${retry.backoff.ms:10000}") <br/>
private Integer kafkaRetryBackoffMs;

Поскольку мы из команды разработчиков, у нас не было особого понимания сетевого аспекта, нужна помощь, если это связано с перегрузкой сети или нам нужно что-то улучшить в самом приложении. Мы не сталкивались с какими-либо проблемами, когда нагрузка составляла менее 10 миллионов в день, а многие новые приложения отправляли сообщения и увеличивали нагрузку. Мы периодически наблюдаем две вышеупомянутые проблемы.

Ответы [ 2 ]

0 голосов
/ 03 мая 2020

Думаю, мне не следует соглашаться с Гиоргосом, хотя, возможно, я просто неправильно понимаю его точку зрения.

Производители и потребители только когда-либо общаются с лидером раздела - другие реплики просто находятся в режиме ожидания на случай, если лидер выходит из строя, и они действуют как потребители для лидера, чтобы поддерживать свои данные в актуальном состоянии.

Однако при запуске приложения клиентский код может подключиться к любому из посредников и узнает лидеров разделов при получении метаданных. Эти метаданные затем кешируются на стороне клиента, но если на стороне посредника происходит смена лидера, то есть когда вы увидите исключение NotLeaderForPartitionException, и это заставит клиентский код снова извлечь метаданные, чтобы получить текущий набор лидеров раздела. , Выбор лидера требует времени, и поэтому в этом процессе будет некоторая задержка, но это признак того, что репликация и устойчивость на уровне брокера работают правильно.

На стороне потребителя ручное принятие или автоматическое принятие сделают нет разницы, если вы используете стандартную фиксацию смещения topi c - автокоммит просто означает, что каждый раз при опросе будут обрабатываться предыдущие обработанные сообщения (фактически, возможно, не КАЖДЫЙ опрос), но это, скорее всего, то же самое, что вы делали бы вручную. Хранение смещений в базе данных поможет сохранить все транзакции, если обработка сообщения означает обновление данных в этой базе данных - в этом случае вы можете зафиксировать смещения и обработанные данные в одной и той же транзакции БД.

По сути, я вполне вы наверняка понимаете, что дубликаты - это неизбежная часть масштабируемости потребителя, поскольку она позволяет любому процессу потребителя выбрать раздел и go от последнего зафиксированного смещения. Дубликаты возникают, когда потребитель обработал часть пакета, а затем считается, что он находится в автономном режиме, либо потому, что процесс фактически завершился, либо из-за того, что обработка пакета заняла слишком много времени. Чтобы избежать дублирования, вы должны убедиться, что каждое обработанное сообщение связано с фиксацией в одной и той же транзакции. Обычно это стоимость пропускной способности, но поскольку вы предлагаете ручную фиксацию каждого сообщения, а не на уровне пакета, а сохранение смещений в одной и той же транзакции БД может предотвратить дублирование потребления.

По вопросу, почему перебалансировка На это есть только две причины - изменение количества разделов в топи c или предполагаемое изменение в составе группы потребителей. Для этого есть две возможные причины - остановка потока пульса, что обычно означает остановку приложения-потребителя или обработку пакета, превышающую max.poll.interval.ms (эта конфигурация предназначена для остановки livelock, когда потребитель активен и отправляет тактовые импульсы, но прекратил опрос). Это последняя нормальная причина перебалансировок за пределами перезапусков приложений - неизбежно иногда возникает некоторая задержка где-то в любой системе, и поэтому перебалансировки потребителей обычно считаются нормальными, если они не происходят слишком часто из-за небольшой задержки при обработке пакета.

Я не уверен в проблемах со стороны производителя - в моем случае я обрабатываю дубликаты у потребителя, а в производителе я просто разрешаю большое количество повторных попыток , с acks = all (необходимо, если вы не можете позволить себе потерять сообщения) и 1 максимальный запрос в полете (для обеспечения заказа). Связаны ли тайм-ауты производителя с NotLeaderForPartitionException? Это просто из-за лидерских выборов?

(есть еще некоторые подробности в https://chrisg23.blogspot.com/2020/02/kafka-acks-configuration-apology.html - немного бессвязное сообщение в блоге, но может быть интересным)

0 голосов
/ 02 мая 2020

Что касается ошибки со стороны производителя, обязательно включите всех брокеров, которые являются лидерами разделов для вашей топи c. Вы можете узнать, какой брокер является лидером раздела, запустив:

./kafka-topics.sh \
    --zookeeper zookeeper-host:2181 \
    --describe \
    --topic your-topic-name 


Topic: your-topic-name   PartitionCount:3    ReplicationFactor:1 
Topic: your-topic-name   Partition: 0    Leader: 2   Replicas: 2 Isr: 2
Topic: your-topic-name   Partition: 1    Leader: 0   Replicas: 0 Isr: 0
Topic: your-topic-name   Partition: 2    Leader: 1   Replicas: 1 Isr: 1

В приведенном выше примере вам нужно будет предоставить все адреса для брокеров 0, 1 и * 1006. *.

...