Подписка на источник Hot Observable, затем сигнализация на тот же источник - PullRequest
1 голос
/ 22 марта 2020

Tl; dr:

Как сигнализировать onNext в HotObservable ПОСЛЕ того, как я подписан на ту же горячую наблюдаемую?

Более длинная версия:

Моя цель - создать Конечная точка REST, которая генерирует событие, затем ждет, прежде чем вернуть ответ (который сам по себе будет событием) - в основном управляемая событиями конечная точка REST с командой / ответом.

Поскольку я не нашел никакого решения, я решил сделать это через реактив java, через отображение событий весны на издателя DirectProcessor (Hot observable). У меня есть:

https://github.com/Venthe/Exploratory-Projects/tree/reactive/reactive

@Slf4j
@Component
class EventDriver {
    private static DirectProcessor<Object> EVENTS = DirectProcessor.create();

    @EventListener(Object.class)
    public void onEvent(Object event) {
        log.info(MessageFormat.format("Event {0} is ready to be pushed into processor.", event.toString()));
        EVENTS.onNext(event);
    }

    // Not important, but it does show that my events are captured & processed via 'normal' subscription
    @EventListener(ApplicationStartedEvent.class)
    public void logger() {
        EVENTS.map(Object::toString).subscribe(e -> log.info(MessageFormat.format("Event {0} received by processor.", e)));
    }

    public Flux<Object> getEvents() {
        log.info("Requesting processor.");
        return EVENTS
                .doOnEach(l -> log.info(MessageFormat.format("Processing subscribed event {0}", l.toString())));
    }
}
@RestController
@RequiredArgsConstructor
class ReservationRestController {
    private final ApplicationEventPublisher eventDispatcher;
    private final EventDriver eventDriver;

    @GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE, value = "/wait-for-event/{name}")
    public Flux<String> performAction(@PathVariable String name) {
        return eventDriver.getEvents()
                .doOnSubscribe(s -> eventDispatcher.publishEvent(new MyEvent(name)))
                .map(Object::toString)
                .log();
    }

    @AllArgsConstructor
    @NoArgsConstructor
    @Data
    class MyEvent {
        private String name;
    }
}

Я понимаю: когда я открываю конечную точку REST \wait-for-event\any, метод Reactive Web performAction если FIRST подпишется на события, ТО - событие отправки (и, следовательно, - запуск конвейера), из-за doOnSubscribe (добавление поведения (побочный эффект), запускаемое, когда подписка {@link Flux} завершена) *

К сожалению, события застряли - подписка не видит мое событие, вот журнал

 : Requesting processor.
 : Event ReservationRestController.MyEvent(name=as3) is ready to be pushed into processor.
 : Event ReservationRestController.MyEvent(name=as3) received by processor.
 : | onSubscribe([Fuseable] FluxOnAssembly.OnAssemblySubscriber)
 : | request(1)

Насколько я понимаю, я отправляю до того, как onSubscribe действительно вызывается.

Of Конечно, любое другое событие, которое произойдет после, будет правильно перехвачено.

...