Как избежать poll (), получающих одно и то же сообщение, если commitSync () не вызывается - PullRequest
0 голосов
/ 02 января 2019

Я немного озадачен ожидаемым поведением poll ().В моем приложении, если processLogic () работает, я должен вручную зафиксировать смещение, чтобы при следующем вызове poll () я получал новые сообщения.

Проблема возникает, когда processLogic () выдает ошибку.Я настроил потребителя на поиск смещения, не выполненного во время обработки.При следующем опросе () он снова получает то же сообщение (правильное поведение, как я приказал потребителю вручную сбросить смещение в эту позицию) Представьте, что оно работает нормально, и также вызывается doCommitSync ().

Неожиданное поведениепроисходит в следующем опросе ().Он должен принимать новые сообщения, но он по-прежнему извлекает последнее сообщение, что приводит к повторному вызову функции processLogic () и doCommitSync ().Он также выдает следующую ошибку во время doCommitSync ():

Не удается завершить принятие, поскольку группа уже перебалансировала и присвоила разделы другому участнику.Это означает, что время между последующими вызовами poll () было больше, чем настроенный max.poll.interval.ms, что обычно означает, что цикл опроса тратит слишком много времени на обработку сообщений.Вы можете решить эту проблему, увеличив тайм-аут сеанса или уменьшив максимальный размер пакетов, возвращаемых в poll () с max.poll.records.

Моя конфигурация потребителя:

enable.auto.commit=false

isolation.level=read_committed

auto.offset.reset=latest

public void runConsumer() {
    Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown));

    try {
      consumer.subscribe(topics);

      while (!closed.get()) {

        processedStatus.set(false);

        final ConsumerRecords<String, String> consumedRecords = consumer.poll(numRecords);
        if (!consumedRecords.isEmpty()) {
          StreamSupport.stream(consumedRecords.spliterator(), false)
            .map(ConsumerRecord::value)
            .forEach(record -> {
              try {
                processLogic(); //do some logic which can fail
                processedStatus.set(true);
              } catch (Exception e) {
                logger.error("Error applying action: " + record.getUuid(), e);
              }
              if (processedStatus.get()) {
                doCommitSync();
              } else {
                consumer.seek(new TopicPartition(recordTopic, recordPartition), recordOffset);
              }
            });
        }
      }
    } catch (WakeupException e) {
      logger.error("Kafka Consumer wakeup exception");
    } finally {
      alertConsumer.close();
      shutdownLatch.countDown();
    }
...