Кафка реставрирующая группа с использованием пружинного кафка потребителя - PullRequest
0 голосов
/ 03 мая 2018

У меня довольно медленный потребитель, который может занять более 5 минут для обработки записи. Чего я хочу избежать, так это как Кафка стабилизирует группу. Чтобы сделать это из моего понимания, я должен установить для брокера kafka следующие свойства:

  group.max.session.timeout.ms = 3600001 
  group.min.session.timeout.ms = 3600000

В моей стороне приложения у меня есть следующая конфигурация:

    @Bean
      public Map<String, Object> consumerConfigs() {
        final Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
            environment.getProperty("app.kafkaBrokers"));
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.valueOf(environment.getProperty("app.session.timeout.ms")) );
        propsMap.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, Integer.valueOf(environment.getProperty("app.session.timeout.ms")) + 1 );
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        return propsMap;
      }

@Bean
  KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
    final ConcurrentKafkaListenerContainerFactory<String, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(9);// was 3
    factory.getContainerProperties().setPollTimeout(3000);
    return factory;
  }

А также в моем слушателе:

 @KafkaListener(id = "baz", topics = "tipJobsForExecution", containerFactory="kafkaListenerContainerFactory")
  public void listen(ConsumerRecord<?, ?> record)

Моему слушателю требуется около 5 минут для обработки сообщений. Как только он закончился, я прочитал следующее на стороне брокера kafka:

2018-05-03 10: 29: 11,210] ИНФОРМАЦИЯ [GroupCoordinator 0]: подготовка к перебалансировать группу баз со старым поколением 22 (__consumer_offsets-7) (Kafka.coordinator.group.GroupCoordinator)

Насколько я понимаю, кафка считает потребителя мертвым и восстанавливает равновесие в группе. Мой вопрос: почему это происходит? У меня есть одна идея, что, возможно, сердцебиение не сердцебиение каждые 3000 мс, как должно, но я не знаю, насколько это неприятно.

Заранее спасибо, Яннис

1 Ответ

0 голосов
/ 03 мая 2018

Вы должны знать о трех типах параметров конфигурации тайм-аута для потребителя Kafka.

heartbeat.interval.ms - ожидаемое время между пульсами для координатора потребителя при использовании средств управления группой Kafka. Обычно должно составлять 1/3 значения session.timeout Значение по умолчанию - 3000 мс

session.timeout.ms - Если посредник не получил тактовые импульсы до истечения этого тайм-аута сеанса, то брокер удалит этого потребителя из группы и инициирует значение re-balance.Default. 10000

max. Значение по умолчанию - 300000

В вашем случае, похоже, что интервал опроса установлен на слишком низкое значение.

Ссылка - https://kafka.apache.org/documentation/#newconsumerconfigs

...