Kafka Приостановка раздела и длительное (10 минут) восстановление - PullRequest
0 голосов
/ 30 августа 2018

Я использую пружинную кафку со следующими конфигурациями:

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
environment.getProperty("kafka.url"));
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
KafkaAvroDeserializer.class);
props.put("specific.avro.reader", true);
props.put("schema.registry.url", 
environment.getProperty("schema.registry.url"));
props.put("session.timeout.ms", 120000);
props.put("request.timeout.ms", 300000);
props.put("max.partition.fetch.bytes", "10240");
props.put("max.poll.interval.ms", 120000);
props.put("max.poll.records", 100);

А

@Bean
public ConcurrentKafkaListenerContainerFactory<String, 
List<AlertNotificationPayload>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, 
List<AlertNotificationPayload>> factory = new 
ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(10);
}

и как только я запускаю сервер, потребители начинают потреблять сообщения, но через некоторое время разделы f ew приостанавливаются и не возобновляются в течение почти 9-10 минут . Время обработки в потоке потребителя составляет миллисекунды и имеет отдельный поток для обработки. Я пробовал несколько комбинаций конфигураций, но ничего не получалось.

Я также пытался использовать параметр factory.getContainerProperties (). SetPauseEnabled (false); что останавливает паузу / возобновление, но через некоторое время начинает происходить перебалансировка, и рано или поздно потребитель умирает, а через некоторое время у группы потребителей не остается потребителя .

Я использую следующие версии:

<dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
         <version>1.1.3.RELEASE</version> //tried latest one as well
</dependency>
<dependency>
        <groupId>io.confluent.maven</groupId>
        <artifactId>kafka-connect-quickstart</artifactId>
        <version>0.10.0.0</version>
</dependency>
<dependency>
        <groupId>io.confluent</groupId>
        <artifactId>kafka-avro-serializer</artifactId>
        <version>3.2.0</version>
</dependency>

Пожалуйста, помогите.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...