Я использую @KafkaListener и ConcurrentKafkaListenerContainerFactory для прослушивания 3 тем kafka, и каждая тема имеет 10 разделов. У меня есть несколько вопросов о том, как это работает.
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(30);
factory.getContainerProperties().setSyncCommits(true);
return factory;
}
@KafkaListener(topics = "topic1", containerFactory="kafkaListenerContainerFactory")
public void handleMessage(final ConsumerRecord<Object, String> arg0) throws Exception {
}
@KafkaListener(topics = "topic2", containerFactory="kafkaListenerContainerFactory")
public void handleMessage(final ConsumerRecord<Object, String> arg0) throws Exception {
}
@KafkaListener(topics = "topic3", containerFactory="kafkaListenerContainerFactory")
public void handleMessage(final ConsumerRecord<Object, String> arg0) throws Exception {
}
мой listener.ackmode имеет значение return , а enable.auto.commit имеет значение false и partition.assignment.strategy: org.apache.kafka.clients.consumer.RoundRobinAssignor
1) мое понимание параллелизма таково, поскольку я установил для моего параллелизма (на уровне фабрики) значение 30, и у меня есть в общей сложности 30 разделов (для всех трехтема вместе) для чтения, каждому потоку будет назначен один раздел. Правильно ли мое понимание? как это повлияет, если я снова переопределю параллелизм внутри аннотации @KafkaListener?
2) Когда Spring вызывает метод poll (), он опрашивает все три темы?
3) с момента установкиlistener.ackmode настроен на возврат, будет ли он ждать, пока все записи, которые были возвращены в одном poll (), завершены, прежде чем выдавать следующий poll ()? И что произойдет, если мои записи будут обрабатываться дольше, чем max.poll.interval.ms? Допустим, в одном вызове poll () возвращается 1-100 смещений, и мой код может обработать только 50 до того, как будет достигнуто значение max.poll.interval.ms, в этот раз будет проведен другой опрос, поскольку он уже достиг максимального значения. .interval.ms? если это так, то следующий опрос () вернет записи со смещением 51?
очень ценю ваше время и помощь