весенняя загрузка кафки session.timeout - PullRequest
0 голосов
/ 26 октября 2019

В моем проекте springBoot я использую kafka прослушивающее сообщение, например:

@KafkaListener(topics = Constants.ARTICLE_TOPIC, groupId = "articleConsumer")
private void receiveArticle(String content) {
    try {
        if (null != content) {
            messageHandler.handleMessage(content);
        }
    } catch (Exception e) {
        logger.error("===Kafka[Article]Consumer error===", e);
    }
}

@KafkaListener(topics = Constants.BUSINESS_TOPIC, groupId = "businessConsumer")
private void receiveBusiness(String content) {}

@KafkaListener(topics = Constants.RULE_TOPIC, groupId = "ruleConsumer")
private void receiveRule(String content) {}

, а метод handleMessage:

public void handleMessage(String msg) throws Exception {
    logger.error("---------handle message-------------");
    ...code handle message...
}

У меня есть breakpoint в logger.error("---------handle message-------------");

, когда я получил msg, программа остановится в logger.error("---------handle message-------------"); и после 10s, я нажимаю F9 (тем временем отменяю breakpoint), такпрограмма запустится.

msg будет обработан 20s-30s.

Я запутался, когда у меня breakpoint и я жду больше 10s, я получу kafka ошибка, breakpoint остановит kafka отправку heartbeat?

ошибка:

2019-10-26 14:34:49,710 ERROR [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] internals.ConsumerCoordinator (:) - [Consumer clientId=consumer-4, groupId=businessConsumer] Offset commit failed on partition medium_business_status-0 at offset 0: The coordinator is not aware of this member.
2019-10-26 14:34:49,710 ERROR [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] internals.ConsumerCoordinator (:) - [Consumer clientId=consumer-2, groupId=ruleConsumer] Offset commit failed on partition rules-0 at offset 0: The coordinator is not aware of this member.
2019-10-26 14:34:49,710 WARN  [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] internals.ConsumerCoordinator (:) - [Consumer clientId=consumer-4, groupId=businessConsumer] Asynchronous auto-commit of offsets {medium_business_status-0=OffsetAndMetadata{offset=0, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
2019-10-26 14:34:49,710 WARN  [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] internals.ConsumerCoordinator (:) - [Consumer clientId=consumer-2, groupId=ruleConsumer] Asynchronous auto-commit of offsets {rules-0=OffsetAndMetadata{offset=0, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
2019-10-26 14:34:49,711 WARN  [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] internals.ConsumerCoordinator (:) - [Consumer clientId=consumer-2, groupId=ruleConsumer] Synchronous auto-commit of offsets {rules-0=OffsetAndMetadata{offset=0, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
2019-10-26 14:34:49,711 WARN  [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] internals.ConsumerCoordinator (:) - [Consumer clientId=consumer-4, groupId=businessConsumer] Synchronous auto-commit of offsets {medium_business_status-0=OffsetAndMetadata{offset=0, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

businessConsumer и ruleConsumer не имеют ничего общегос дескриптором msg, между тем у них нет сообщений, но есть ошибка, а ошибка articleConsumer:

2019-10-26 14:34:50,237 INFO  [kafka-coordinator-heartbeat-thread | articleConsumer] internals.AbstractCoordinator (:) - [Consumer clientId=consumer-6, groupId=articleConsumer] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2019-10-26 14:34:53,500 INFO  [kafka-coordinator-heartbeat-thread | articleConsumer] internals.AbstractCoordinator (:) - [Consumer clientId=consumer-6, groupId=articleConsumer] Attempt to heartbeat failed for since member id consumer-6-944e8b2d-7519-461c-99e6-e957f9eb97f6 is not valid.

Я знаю kafka свойства:

 `session.timeout.ms = 10000
  max.poll.interval.ms = 300000`

session.timeout.ms используется для судьи consumer жив;

max.poll.interval.ms - это максимальное время между двумя poll().

версия Springboot <version>2.1.6.RELEASE</version>

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...