Tl; dr:
Как сигнализировать onNext в HotObservable ПОСЛЕ того, как я подписан на ту же горячую наблюдаемую?
Более длинная версия:
Моя цель - создать Конечная точка REST, которая генерирует событие, затем ждет, прежде чем вернуть ответ (который сам по себе будет событием) - в основном управляемая событиями конечная точка REST с командой / ответом.
Поскольку я не нашел никакого решения, я решил сделать это через реактив java, через отображение событий весны на издателя DirectProcessor (Hot observable). У меня есть:
https://github.com/Venthe/Exploratory-Projects/tree/reactive/reactive
@Slf4j
@Component
class EventDriver {
private static DirectProcessor<Object> EVENTS = DirectProcessor.create();
@EventListener(Object.class)
public void onEvent(Object event) {
log.info(MessageFormat.format("Event {0} is ready to be pushed into processor.", event.toString()));
EVENTS.onNext(event);
}
// Not important, but it does show that my events are captured & processed via 'normal' subscription
@EventListener(ApplicationStartedEvent.class)
public void logger() {
EVENTS.map(Object::toString).subscribe(e -> log.info(MessageFormat.format("Event {0} received by processor.", e)));
}
public Flux<Object> getEvents() {
log.info("Requesting processor.");
return EVENTS
.doOnEach(l -> log.info(MessageFormat.format("Processing subscribed event {0}", l.toString())));
}
}
@RestController
@RequiredArgsConstructor
class ReservationRestController {
private final ApplicationEventPublisher eventDispatcher;
private final EventDriver eventDriver;
@GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE, value = "/wait-for-event/{name}")
public Flux<String> performAction(@PathVariable String name) {
return eventDriver.getEvents()
.doOnSubscribe(s -> eventDispatcher.publishEvent(new MyEvent(name)))
.map(Object::toString)
.log();
}
@AllArgsConstructor
@NoArgsConstructor
@Data
class MyEvent {
private String name;
}
}
Я понимаю: когда я открываю конечную точку REST \wait-for-event\any
, метод Reactive Web performAction
если FIRST подпишется на события, ТО - событие отправки (и, следовательно, - запуск конвейера), из-за doOnSubscribe
(добавление поведения (побочный эффект), запускаемое, когда подписка {@link Flux} завершена) *
К сожалению, события застряли - подписка не видит мое событие, вот журнал
: Requesting processor.
: Event ReservationRestController.MyEvent(name=as3) is ready to be pushed into processor.
: Event ReservationRestController.MyEvent(name=as3) received by processor.
: | onSubscribe([Fuseable] FluxOnAssembly.OnAssemblySubscriber)
: | request(1)
Насколько я понимаю, я отправляю до того, как onSubscribe действительно вызывается.
Of Конечно, любое другое событие, которое произойдет после, будет правильно перехвачено.