У меня есть служба источника данных, которая принимает в качестве параметра наблюдателя.
void subscribe(Consumer onEventConsumer);
Я хочу использовать поток в качестве потока ответа для RSocket. Как я могу это сделать? Как я вижу сейчас, это должно быть что-то вроде
Flux<T> controllerMethod(RequestMessage mgs) {
var flux = Flux.empty();
dataSource.subscribe(event -> flux.push(event));
return flux;
}
Но я сильно сомневаюсь, что это правильное решение, и я новичок в реактивном подходе, я не знаю, какие методы мне следует использовать здесь?