Весенняя кафка потребляет записи с некоторой задержкой - PullRequest
0 голосов
/ 24 февраля 2020

Я использую пружинную кафку в своем приложении. Я хочу добавить некоторую задержку в 15 минут для использования записей для одного из слушателей - kafkaRetryListenerContainerFactory. У меня есть два слушателя. Ниже моя конфигурация:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(primaryConsumerFactory());
    return factory;
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaRetryListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(retryConsumerFactory());
    factory.setConcurrency(this.kafkaConfigProperties.getConsumerConcurrency());
    factory.setAutoStartup(true);
    factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);       
    return factory;
}

Прослушиватель повторных попыток Кафки:

@KafkaListener(topics = "${spring.kafka.retry.topic}", groupId = "${spring.kafka.consumer-group-id}", 
        containerFactory = "kafkaRetryListenerContainerFactory", id = "retry.id")
public void retryMessage(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
    Thread.sleep(900000);
    LOG.info(String.format("Consumed retry message -> %s", record.toString()));
    acknowledgment.acknowledge();
}

Когда я добавил Thread.sleep (), я получаю постоянную ошибку перебалансировки в журналах

Attempt to heartbeat failed since group is rebalancing

Моя весенняя версия кафки - 2.3.4

Ниже приведены значения конфигурации:

max.poll.interval.ms = 1200000 (это выше, чем thread.sleep)

heartbeat.interval.ms = 3000

session.timeout.ms = 10000

Я попробовал ack.nack (900000); Все еще получаю ошибку перебалансировки

Любая помощь будет оценена

1 Ответ

1 голос
/ 24 февраля 2020

Фильтр не правильный подход; вам нужно Thread.sleep() потока и убедиться, что max.poll.interval.ms больше, чем общее время сна и время обработки для записей, полученных опросом.

В 2.3 контейнер имеет возможность спать между опросами ; в более ранних версиях вы должны спать сами.

РЕДАКТИРОВАТЬ

Я только что нашел это в моем server.properties (доморощенный на Ма c ОС):

############################# Group Coordinator Settings #############################

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0

Это объясняет, почему мы видим разделы, изначально назначенные первому потребителю (см. Комментарий ниже).

Установка для него значения по умолчанию 3000 работает для меня.

...