Я использую Spring Kafka для разработки своего проекта. Я использую topi c, который имеет:
разделов: 5
min.insyn c .replicas: 3
Я вызываю несколько конечных точек отдыха внутри "@KakfaListener" и любой ответ, который я получаю от тех конечных точек отдыха, которые я публикую, для двух разных топик c.
Для конечных точек отдыха это занимает около 20-25 секунд, а для публикации двум другой топи c занимает около 15 секунд.
В среднем мы завершаем все вызовы конечной точки отдыха и вызов издателя Kafka за 35-40 секунд, что, я считаю, меньше максимального интервала опроса. Но после обработки около 100 записей я получаю следующее исключение:
CommitFailedException: коммит не может быть завершен, так как группа уже перебалансировала и присвоила разделы другому члену. Это означает, что время между последующими вызовами poll () было больше, чем настроенное max.poll.interval.ms
Следующее, я дал потребительское свойство:
props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(GROUP_ID_CONFIG, groupId);
props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(AUTO_OFFSET_RESET_CONFIG, offsetReset);
props.put(ENABLE_AUTO_COMMIT_CONFIG, Boolean.FALSE);
props.put(MAX_POLL_RECORDS_CONFIG, 50);
props.put(SESSION_TIMEOUT_MS_CONFIG, 30000);
props.put(MAX_POLL_INTERVAL_MS_CONFIG, 300000);
и ниже слушатель:
@KafkaListener(topics = "${test.topic}")
public void consume(
@NotNull final ConsumerRecord<String, String> cr,
@Payload final String payload,
final Acknowledgment acknowledgment) {
final XYZObject xyzObject =
objectMapper.readValue(payload, XYZObject.class);
log.info("Name =\"{}\"", xyzObject.getName());
final Instant prcTime = Instant.now();
final Response response = processPayload(xyzObject);
log.info(
"processPayload() took =\"{}\"ms", Duration.between(prcTime, Instant.now()).toMillis());
final Instant pblshTime = Instant.now();
publishResponseToKafka(response, xyzObject);
log.info(
"publishResponseToKafka() took =\"{}\"ms", Duration.between(pblshTime, Instant.now()).toMillis());
try {
acknowledgment.acknowledge();
} catch(CommitFailedException ce){
log.error("commitFailedException: {}", ce);
}
}