Я пытаюсь настроить проект с облаком Springboot Stream с Kafka.Мне удалось создать простой пример, когда слушатель получает сообщения из темы и после обработки отправляет вывод в другую тему.
Мой слушатель и каналы настроены так:
@Component
public class FileEventListener {
private FileEventProcessorService fileEventProcessorService;
@Autowired
public FileEventListener(FileEventProcessorService fileEventProcessorService) {
this.fileEventProcessorService = fileEventProcessorService;
}
@StreamListener(target = FileEventStreams.INPUT)
public void handleLine(@Payload(required = false) String jsonData) {
this.fileEventProcessorService.process(jsonData);
}
}
public interface FileEventStreams {
String INPUT = "file_events";
String OUTPUT = "raw_lines";
@Input(INPUT)
SubscribableChannel inboundFileEventChannel();
@Output(OUTPUT)
MessageChannel outboundRawLinesChannel();
}
Проблема с этим примером заключается в том, что при запуске службы она не проверяет сообщения, уже существующие в теме, она обрабатывает только те сообщения, которые отправлены после ее запуска.Я очень плохо знаком с потоком Springboot и kafka, но для того, что я прочитал, это поведение может соответствовать тому факту, что я использую SubscribableChannel
.Я попытался использовать QueueChannel
, например, чтобы увидеть, как это работает, но я нашел следующее исключение:
Error creating bean with name ... nested exception is java.lang.IllegalStateException: No factory found for binding target type: org.springframework.integration.channel.QueueChannel among registered factories: channelFactory,messageSourceFactory
Итак, мои вопросы:
- Если я хочучтобы обработать все сообщения, которые существуют в теме, после запуска приложения (а также сообщения обрабатываются только одним потребителем), я нахожусь на правильном пути?
- Даже если
QueueChannel
не является правильным выбором длядобиться поведения, описанного в 1.) Что я должен добавить в свой проект, чтобы иметь возможность использовать этот тип канала?
Спасибо!