@KafkaListener
- это высокоуровневый API для ConcurrentMessageListenerContainer
, который порождает нескольких внутренних слушателей вокруг KafkaConsumer
.
Разница в том, что KafkaConsumer
API является запрашиваемым по требованию, когда вы вызываете его poll()
всякий раз, когда вам нужно. Абстракция слушателя собирается иметь бесконечный цикл вокруг этого poll()
и генерирует сообщения для записей всякий раз, когда они появляются из poll()
. У нас есть исполнитель задач, который выполняет такую логику:
while (isRunning()) {
try {
pollAndInvoke();
}
catch (@SuppressWarnings(UNUSED) WakeupException e) {
// Ignore, we're stopping
}
catch (NoOffsetForPartitionException nofpe) {
this.fatalError = true;
ListenerConsumer.this.logger.error("No offset and no reset policy", nofpe);
break;
}
catch (Exception e) {
handleConsumerException(e);
}
catch (Error e) { // NOSONAR - rethrown
Runnable runnable = KafkaMessageListenerContainer.this.emergencyStop;
if (runnable != null) {
runnable.run();
}
this.logger.error("Stopping container due to an Error", e);
wrapUp();
throw e;
}
}
KafkaConsumer.poll()
называется в этом pollAndInvoke();
.