Потребители кафки не потребляют после простоя в течение длительного времени - PullRequest
0 голосов
/ 20 октября 2018

Привет! У меня есть данные о потребителях кафки.Следующая команда дает мне тайм-аут команды группы потребителей.

kafka-consumer-groups.sh --bootstrap-server b1:9092,b2:9092,b3:9092,b4:9092,b5:9092,b6:9092,b7:9092,b8:9092,b9:9092,b10:9092,b11:9092,b12:9092,b13:9092 --describe --group testgroup

Ошибка: сбой при выполнении команды группы клиентов из-за истечения времени ожидания команды группы потребителей во время ожидания инициализации группы:

Все потребители использовали данные более 26 часов.Был разрыв в течение более 6 часов, поскольку производители перестали производить данные для этих 6 часов.

Я подозреваю, что какое-то время IDLE могло разъединить соединение между группами потребителей с потребителями.Все потребители потребляют с интервалом опроса 100 мс poll(100).

Этот сценарий наблюдался более 3 раз.Любая помощь от экспертов Kafka приветствуется.Спасибо.

Код:

@Service
public class DedupeConsumerService {

    final Logger logger = LoggerFactory.getLogger(DedupeConsumerService.class);

    @Autowired
    private TaskExecutor taskExecutor;

    @Autowired
    private PropertyConfig config;

    @Autowired
    private ApplicationContext applicationContext;

    public void consume() {

        String topic = config.getDedupServiceConsumerTopic();
        String consGroup = config.getDedupServiceConsGroup();

        Properties props = new Properties();
        props.put("enable.auto.commit", "false");
        props.put("session.timeout.ms", "20000");
        props.put("max.poll.records", "10000");

        KafkaConsumer<String, AvroSyslogMessage> consumer = new GenericConsumer<String, AvroSyslogMessage>().initialize(topic, consGroup, STREAMSERDE.STRINGDESER, STREAMSERDE.AVRODESER, props);

        logger.info("Dedupe Kafka Consumer Initialized......");

        try {
            while (true) {
                ConsumerRecords<String, AvroSyslogMessage> records = consumer.poll(100);
                if (records.count() > 0) {

                    }


                    logger.info("Number of Records:: " + records.count() + " Time took to process poll :: " + durationInMilliSec);

                }
            }

        } catch (Throwable e) {
            logger.error("Error occured while processing message", e);
            e.printStackTrace();
        } finally {
            logger.debug("dedupe kafka consume is closing");
            consumer.close();
        }

    }

}

Я попытался установить время опроса, так как Integer.MAX_VALUE это не помогло.

...