CommitFailedException: принятие не может быть завершено, так как группа уже перебалансировала и присвоила разделы другому участнику - PullRequest
0 голосов
/ 30 апреля 2020

Я использую Spring Kafka для разработки своего проекта. Я использую topi c, который имеет:

разделов: 5

min.insyn c .replicas: 3

Я вызываю несколько конечных точек отдыха внутри "@KakfaListener" и любой ответ, который я получаю от тех конечных точек отдыха, которые я публикую, для двух разных топик c.

Для конечных точек отдыха это занимает около 20-25 секунд, а для публикации двум другой топи c занимает около 15 секунд.

В среднем мы завершаем все вызовы конечной точки отдыха и вызов издателя Kafka за 35-40 секунд, что, я считаю, меньше максимального интервала опроса. Но после обработки около 100 записей я получаю следующее исключение:

CommitFailedException: коммит не может быть завершен, так как группа уже перебалансировала и присвоила разделы другому члену. Это означает, что время между последующими вызовами poll () было больше, чем настроенное max.poll.interval.ms

Следующее, я дал потребительское свойство:

props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(GROUP_ID_CONFIG, groupId);
props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(AUTO_OFFSET_RESET_CONFIG, offsetReset);
props.put(ENABLE_AUTO_COMMIT_CONFIG, Boolean.FALSE);
props.put(MAX_POLL_RECORDS_CONFIG, 50);
props.put(SESSION_TIMEOUT_MS_CONFIG, 30000);
props.put(MAX_POLL_INTERVAL_MS_CONFIG, 300000);

и ниже слушатель:

@KafkaListener(topics = "${test.topic}")
public void consume(
      @NotNull final ConsumerRecord<String, String> cr,
      @Payload final String payload,
      final Acknowledgment acknowledgment) {

    final XYZObject xyzObject =
        objectMapper.readValue(payload, XYZObject.class);

    log.info("Name =\"{}\"", xyzObject.getName());
    final Instant prcTime = Instant.now();
    final Response response = processPayload(xyzObject);
    log.info(
        "processPayload() took =\"{}\"ms", Duration.between(prcTime, Instant.now()).toMillis());

    final Instant pblshTime = Instant.now();
    publishResponseToKafka(response, xyzObject);
    log.info(
            "publishResponseToKafka() took =\"{}\"ms", Duration.between(pblshTime, Instant.now()).toMillis());
    try {
    acknowledgment.acknowledge();
    } catch(CommitFailedException  ce){
      log.error("commitFailedException: {}", ce);
    }
}
...