Как я могу выполнить бесконечный опрос Кафки с помощью Reactor? - PullRequest
0 голосов
/ 23 ноября 2018

Каков реактивный способ реализовать бесконечный цикл опроса с использованием Reactor?В идеале я хотел бы отправлять сообщения из приложения Producer, а приложение Consumer должно бесконечно прослушивать и обрабатывать поток сообщений одинаковым образом каждый раз, когда оно его получает, и отправлять поток результатов обратно.Нужен ли мне цикл блокировки, или есть ли способ поддержать издателя, который может получить поток, обработать его и отправить обратно?

1 Ответ

0 голосов
/ 20 декабря 2018

Попробуйте использовать процессоры при получении данных от Кафки, отправьте их на процессор

      DirectProcessor <Strings> directProcessor = DirectProcessor.create();
    directProcessor.subscribe();

    public void itemEmitt(String string){
        directProcessor.onNext(string);
    }

теперь directprocessor - это поток, который бесконечно прослушивает событие и генерирует событие

...