Я думаю, что отчасти проблема в том, что вы пытаетесь получить Disposable
, который вы хотите вызвать в конце сеанса. Но при этом вы подписываетесь на последовательность самостоятельно. Spring Framework также подпишется на Flux
, возвращаемое getStreamSuspendCount
, и именно эту подписку необходимо отменить, чтобы клиент SSE получил уведомление.
Теперь, как этого добиться? То, что вам нужно, это своего рода «клапан», который отменит свой источник при получении внешнего сигнала. Это то, что делает takeUntilOther(Publisher<?>)
.
Так что теперь вам нужен Publisher<?>
, который вы можете привязать к жизненному циклу сеанса (точнее, к событию закрытия сеанса): как только он выдаст, takeUntilOther
отменит свой источник.
2 варианта там:
- событие закрытия сеанса предоставляется в API, похожем на слушатель: используйте
Mono.create
- вам действительно нужно вручную отменить отмену: используйте
MonoProcessor.create()
, и когда придет время, протолкните любое значение через него
Вот упрощенные примеры с составленными API для пояснения:
Создать
return theFluxForSSE.takeUntilOther(Mono.create(sink ->
sessionEvent.registerListenerForClose(closeEvent -> sink.success(closeEvent))
));
MonoProcessor
MonoProcessor<String> processor = MonoProcessor.create();
beDisposeFlow.add(processor); // make it available to your session scope?
return theFluxForSSE.takeUntilOther(processor); //Spring will subscribe to this
Давайте смоделируем закрытие сессии с помощью запланированного задания:
Executors.newSingleThreadScheduledExecutor().schedule(() ->
processor.onNext("STOP") // that's the key part: manually sending data through the processor to signal takeUntilOther
, 2, TimeUnit.SECONDS);
Вот пример смоделированного модульного теста, который вы можете запустить, чтобы лучше понять, что происходит:
@Test
public void simulation() {
Flux<Long> theFluxForSSE = Flux.interval(Duration.ofMillis(100));
MonoProcessor<String> processor = MonoProcessor.create();
Executors.newSingleThreadScheduledExecutor().schedule(() -> processor.onNext("STOP"), 2, TimeUnit.SECONDS);
theFluxForSSE.takeUntilOther(processor.log())
.log()
.blockLast();
}