Не удалось выполнить асинхронный запрос kafka - PullRequest
0 голосов
/ 08 марта 2019

Я заметил, что отставание потребителя кафки внезапно начинает увеличиваться после нескольких часов / дней подряд.

После проверки логов я вижу много исключений:

org.apache.kafka.clients.consumer.RetriableCommitFailedException: сбой фиксации смещения с повторяющимся исключением. Вы должны повторить попытку отправки последних смещений.

Мой класс ConsumerThread:

public class ConsumerThread implements Runnable {
  private final KafkaConsumer<String, Map<String, Object>> consumer;
  public ConsumerThread(
    this.consumer = new KafkaConsumer<>(getConsumerConfig(kafkaConfiguration));
  }

  @Override
  public void run() {
    try {
      consumer.subscribe(topicList);

      while (true) {
        ConsumerRecords<String, Map<String, Object>> records =
            consumer.poll(Duration.ofMillis(kafkaConfiguration.getPollIntervalMs()));

        long startPerPoll = System.nanoTime();
        for (final ConsumerRecord<String, Map<String, Object>> record : records) {
            // message processing logic here
        }


        consumer.commitAsync((offsets, exception) -> {
            if (exception != null) {
              //log.error(exception.getMessage());
              log.info("exception while committing offset, consumerThread: {}, exception: {}", Thread.currentThread().getName(), exception);
              exception.printStackTrace();
            }
        });


      }
    } catch (Exception e) {
      // ignore for shutdown
      log.info("exception in run for consumerThread: {}", e);
    } finally {
      try {
        if (Objects.nonNull(consumer)) {
          consumer.commitSync();
        }
      } finally {
        if (Objects.nonNull(consumer)) {
          consumer.close();
        }
      }
    }
}

Моя конфигурация кафки:

groupId: cep-cg
autoCommitEnabled: false
sessionTimeoutMs: 30000
heartBeatIntervalMs: 10000
autoOffsetReset: latest
maxPollRecord: 250
maxPollIntervalMs: 180000
requestTimeoutMs: 240000
pollIntervalMs: 3000

Я проверил другие ответы на stackoverflow и сделал несколько настроек, но ни один из них, похоже, не работает.

Что я хочу знать:

  1. Есть какие-нибудь сведения о том, почему задержка может внезапно увеличиваться?

  2. Возможно ли, что много запросов commitAsync ожидает на брокере, и, возможно, какое-то время (определенное некоторыми настройками на брокере), запрос commitAsync начинает терпеть неудачу?

  3. Допустим, для обработки сообщения потребителю потребовалось больше max.poll.interval.ms времени. В этом случае он будет исключен из группы и сработает перебалансировка. Теперь весь запрос commitAsync, ожидающий обработки на брокере, завершается с ошибкой CommitFailedException, поскольку раздел теперь принадлежит другому потребителю в группе. В приведенном выше коде потребитель выйдет из бесконечного цикла и будет закрыт навсегда. Это правильный путь? Или мне нужно перехватить CommitFailedException и снова возобновить цикл, чтобы потребитель остался в живых?

1 Ответ

0 голосов
/ 08 марта 2019

Представьте, что мы отправили запрос на фиксацию смещения 2000. Существует временная проблема связи, поэтому брокер никогда не получает запрос и, следовательно, никогда не отвечает. В то же время, мы обработали другую партию и успешно зафиксировали смещение 3000. Теперь повторяется попытка ранее завершившейся неудачной фиксации пакетов, и, за исключением, оно отображает то же сообщение. В случае перебалансировки это вызовет больше дубликатов

A. Время запаздывания увеличивается

Поскольку перебалансировка происходит чаще из-за того, что потребитель не потребляет записи постоянно, а продюсер постоянно производит записи.

B. тайм-аут запроса commitAsync

Только активные члены группы могут фиксировать смещения. Если потребитель был исключен из группы при попытке зафиксировать смещение, он выбросит CommitFailedException

c.Rebalance

Когда начинается перебалансировка, потребитель должен завершить любую обработку, которую он выполняет в данный момент, зафиксировать смещения и снова присоединиться к группе до истечения времени ожидания сеанса.

Мы должны повлиять на consumerRebalanceListener и использовать onPartitionsRevoked () для фиксации смещений перед тем, как потерять владение разделом для фиксации текущего смещения.

max.poll.interval.ms и max.poll.records до достаточно низкого значения, сохраняя при этом значение session.timeout.ms, чтобы время обнаружения сбоев не было нужно пожертвовать.

CommitFailedException, выброшенное из commitSync (). Это гарантирует, что только активные члены группы могут фиксировать смещения. Если потребитель был исключен из группы, то его разделы будут назначены другому участнику, который будет фиксировать свои собственные смещения.

...