Имитация Кафки CommitFailedException - PullRequest
0 голосов
/ 31 января 2020

Я пытаюсь смоделировать исключение CommitFailedException, выданное Кафкой.

Я вручную устанавливаю для "session.timeout.ms" значение 10000 мс, а для "enable.auto.commit" - значение false.

После Kafkaconsumer.poll () у меня есть оператор Thread.sleep (12000), после которого я делаю коммит. Я ожидаю, что, поскольку поток принимает 12 с до следующего опроса, потребитель должен быть помечен как мертвый, а исключение CommitFailedException должно быть выдано. Тем не менее, процесс выполняется гладко.

Как я могу смоделировать исключение, выдаваемое KafkaConsumer.

consumer.subscribe(Arrays.asList("foo"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);

            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.value());
            }

            try {
                Thread.sleep(12000);
            }catch (Exception e){
                e.printStackTrace();
            }
            consumer.commitSync();
        }

1 Ответ

1 голос
/ 31 января 2020

Кафка использует механизм сердцебиения через отдельную нить для проверки здоровья потребителя. Поток пульса потребителя должен отправить брокеру пульс до истечения времени session.timeout.ms.

heartbeat.interval.ms: Ожидаемое время между пульсами для координатора потребителя при использовании Средства управления группой Кафки. Пульс используется для обеспечения того, чтобы сеанс потребителя оставался активным, и для облегчения перебалансировки, когда новые потребители присоединяются или покидают группу.

session.timeout.ms: Время ожидания, используемое для обнаружения сбоев клиента, когда используя средство управления группой Кафки. Клиент периодически посылает биения, чтобы указать его жизнеспособность брокеру. Если посредник не получил пульса до истечения этого тайм-аута сеанса, то брокер удалит этого клиента из группы и инициирует перебалансировку.

Еще одним механизмом проверки жизнедеятельности потребителей является опрос. Ожидается, что потребитель опрашивает () без истечения срока действия max.poll.interval.ms. Если это время истекает (обычно долго работающий процесс приводит к этой проблеме), снова потребитель считается мертвым.

max.poll.interval.ms: Максимальная задержка между вызовами опроса ( ) при использовании управления группами потребителей. Это накладывает верхнюю границу на количество времени, в течение которого потребитель может бездействовать до получения большего количества записей. Если poll () не вызывается до истечения этого тайм-аута, то потребитель считается сбойным, и группа выполнит балансировку, чтобы переназначить разделы другому участнику.

Если потребитель считается мертвым Kafka либо из-за отсутствия пульса в session.timeout.ms, либо из-за отсутствия опроса в max.poll.interval.ms потребитель не может фиксировать сообщения и получает CommitFailedException.

CommitFailedException: Это исключение возникает, когда фиксация смещения с KafkaConsumer.commitSyn c () завершается с ошибкой, которую невозможно исправить. Это может произойти, когда групповая перебалансировка завершится до того, как фиксация может быть успешно применена. В этом случае фиксация обычно не может быть повторена, поскольку некоторые из разделов, возможно, уже были назначены другому члену в группе.

В результате; Поскольку поток сердцебиения является отдельным потоком, сон в вашем коде не может повлиять на это. Но в вашем случае вы можете установить max.poll.interval.ms на 10 секунд, чтобы получить CommitFailedException.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...