Привет! У меня есть данные о потребителях кафки.Следующая команда дает мне тайм-аут команды группы потребителей.
kafka-consumer-groups.sh --bootstrap-server b1:9092,b2:9092,b3:9092,b4:9092,b5:9092,b6:9092,b7:9092,b8:9092,b9:9092,b10:9092,b11:9092,b12:9092,b13:9092 --describe --group testgroup
Ошибка: сбой при выполнении команды группы клиентов из-за истечения времени ожидания команды группы потребителей во время ожидания инициализации группы:
Все потребители использовали данные более 26 часов.Был разрыв в течение более 6 часов, поскольку производители перестали производить данные для этих 6 часов.
Я подозреваю, что какое-то время IDLE могло разъединить соединение между группами потребителей с потребителями.Все потребители потребляют с интервалом опроса 100 мс poll(100)
.
Этот сценарий наблюдался более 3 раз.Любая помощь от экспертов Kafka приветствуется.Спасибо.
Код:
@Service
public class DedupeConsumerService {
final Logger logger = LoggerFactory.getLogger(DedupeConsumerService.class);
@Autowired
private TaskExecutor taskExecutor;
@Autowired
private PropertyConfig config;
@Autowired
private ApplicationContext applicationContext;
public void consume() {
String topic = config.getDedupServiceConsumerTopic();
String consGroup = config.getDedupServiceConsGroup();
Properties props = new Properties();
props.put("enable.auto.commit", "false");
props.put("session.timeout.ms", "20000");
props.put("max.poll.records", "10000");
KafkaConsumer<String, AvroSyslogMessage> consumer = new GenericConsumer<String, AvroSyslogMessage>().initialize(topic, consGroup, STREAMSERDE.STRINGDESER, STREAMSERDE.AVRODESER, props);
logger.info("Dedupe Kafka Consumer Initialized......");
try {
while (true) {
ConsumerRecords<String, AvroSyslogMessage> records = consumer.poll(100);
if (records.count() > 0) {
}
logger.info("Number of Records:: " + records.count() + " Time took to process poll :: " + durationInMilliSec);
}
}
} catch (Throwable e) {
logger.error("Error occured while processing message", e);
e.printStackTrace();
} finally {
logger.debug("dedupe kafka consume is closing");
consumer.close();
}
}
}
Я попытался установить время опроса, так как Integer.MAX_VALUE
это не помогло.