Springboot облако Стрим с Кафкой - PullRequest
0 голосов
/ 10 декабря 2018

Я пытаюсь настроить проект с облаком Springboot Stream с Kafka.Мне удалось создать простой пример, когда слушатель получает сообщения из темы и после обработки отправляет вывод в другую тему.

Мой слушатель и каналы настроены так:

@Component
public class FileEventListener {
    private FileEventProcessorService fileEventProcessorService;

    @Autowired
    public FileEventListener(FileEventProcessorService fileEventProcessorService) {
        this.fileEventProcessorService = fileEventProcessorService;
    }

    @StreamListener(target = FileEventStreams.INPUT)
    public void handleLine(@Payload(required = false) String jsonData) {
        this.fileEventProcessorService.process(jsonData);
    }
}

public interface FileEventStreams {
    String INPUT = "file_events";
    String OUTPUT = "raw_lines";

    @Input(INPUT)
    SubscribableChannel inboundFileEventChannel();

    @Output(OUTPUT)
    MessageChannel outboundRawLinesChannel();
}

Проблема с этим примером заключается в том, что при запуске службы она не проверяет сообщения, уже существующие в теме, она обрабатывает только те сообщения, которые отправлены после ее запуска.Я очень плохо знаком с потоком Springboot и kafka, но для того, что я прочитал, это поведение может соответствовать тому факту, что я использую SubscribableChannel.Я попытался использовать QueueChannel, например, чтобы увидеть, как это работает, но я нашел следующее исключение:

Error creating bean with name ... nested exception is java.lang.IllegalStateException: No factory found for binding target type: org.springframework.integration.channel.QueueChannel among registered factories: channelFactory,messageSourceFactory

Итак, мои вопросы:

  1. Если я хочучтобы обработать все сообщения, которые существуют в теме, после запуска приложения (а также сообщения обрабатываются только одним потребителем), я нахожусь на правильном пути?
  2. Даже если QueueChannel не является правильным выбором длядобиться поведения, описанного в 1.) Что я должен добавить в свой проект, чтобы иметь возможность использовать этот тип канала?

Спасибо!

1 Ответ

0 голосов
/ 10 декабря 2018
  1. Добавить spring.cloud.stream.bindings.file_events.group=foo

    • анонимные группы потребляют только с конца темы, привязки с группой потребляют с начала, по умолчанию.
  2. Вы не можете использовать PollableChannel для привязки, это должно быть SubscribableChannel.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...