У меня есть обработчик kafka в весенней загрузке:
@KafkaListener(topics = "topic-one", groupId = "response")
public void listen(String response) {
myService.processResponse(response);
}
Например, производитель отправляет одно сообщение каждую секунду. Но myService.processResponse
работают 10 секунд. Мне нужно обработать каждое сообщение и начать myService.processResponse
в новой теме. Я могу создать своего исполнителя и делегировать каждый ответ на него. Но я думаю, что для них есть другие конфиги в кафке. Я нашел 2:
1) добавить concurrency = "5"
к аннотации @KafkaListener
- похоже, работает. Но я не уверен, насколько правильно, потому что у меня есть второй путь:
2) Я могу создать ConcurrentKafkaListenerContainerFactory
и установить для него ConsumerFactory
и concurrency
Я не понимаю разницы между этими методами? достаточно ли просто добавить concurrency = "5"
к аннотации @KafkaListener
или мне нужно создать ConcurrentKafkaListenerContainerFactory
?
Или я вообще ничего не понимаю и есть другой способ?