Я хотел, чтобы у Kafka был фиксированный тариф, чтобы мы могли опрашивать () тему каждые три минуты и обрабатывать полученные записи (при условии, что эти записи будут обработаны в течение 3 минут).
Я написал код, подобный приведенному ниже
consumer.subscribe("myTopic", this); //this class implements ConsumerRebalanceListener
Executors.newScheduledThreadPool(1).scheduleAtFixedRate(() -> {
LOGGER.debug("FIRE");
try {
ConsumerRecords<String, String> consumerRecords = consumer
.poll(Duration.ofMillis(100)); //return quickly.
//myMessageHandler.handle(consumerRecords);
consumer.commitSync();//have set enable.auto.commit to false
} catch(Exception exception) {
consumer.close();
}
}, 5, 180, TimeUnit.SECONDS);//initial delay 5 sec, and then every 3 minutes
Кажется, что потребитель получает обратный вызов для onPartitionsRevoked () или onPartitionsAssigned () после ожидания в течение многих минут (около 9-10 минут).Таким образом, poll () не возвращает никаких записей в течение этого времени.
Однако, если я увеличу тайм-аут для customer.poll () до 100 мс с 100 мс.Перебалансировка (распределение разделов) происходит во время первого вызова poll () и начинает возвращать записи.
Имеет ли назначение раздела какую-либо зависимость от тайм-аута poll ()?Кажется, что если для poll () задано много времени, назначение раздела происходит при самом первом вызове poll (), тогда как при опросе (timeout <= 100ms) требуется много минут (или много вызовов poll ()) для завершенияназначение разделов.</p>