Как обращаться с sse соединением закрытым? - PullRequest
0 голосов
/ 29 июня 2018

У меня есть конечная точка, как в примере блока кода. При потоковой передаче я вызываю асинхронный метод через streamHelper.getStreamSuspendCount(). Я остановил этот асинхронный метод при изменении состояния. Но я не могу получить доступ к этому асинхронному методу, когда браузер закрыт и сеанс завершен. Я останавливаю асинхронный метод в области видимости при изменении состояния. Но я не могу получить доступ к этому асинхронному методу, когда браузер закрыт и сеанс завершен. Как я могу получить доступ к этой области, когда сессия закрыта?

@RequestMapping(value = "/stream/{columnId}/suspendCount", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@ResponseBody
public Flux<Integer> suspendCount(@PathVariable String columnId) {
    ColumnObject columnObject = streamHelper.findColumnObjectInListById(columnId);
    return streamHelper.getStreamSuspendCount(columnObject);
}


getStreamSuspendCount(ColumnObject columnObject) {
   ...
   //async flux
   Flux<?> newFlux = beSubscribeFlow.get(i);
   Disposable disposable = newFlux.subscribe();
   beDisposeFlow.add(disposable); // my session scope variable. if change state, i will kill disposable (dispose()).
   ...
   return Flux.fromStream(Stream.generate(() -> columnObject.getPendingObject().size())).distinctUntilChanged()
                    .doOnNext(i -> {
                        System.out.println(i);
                    }));
}

1 Ответ

0 голосов
/ 29 июня 2018

Я думаю, что отчасти проблема в том, что вы пытаетесь получить Disposable, который вы хотите вызвать в конце сеанса. Но при этом вы подписываетесь на последовательность самостоятельно. Spring Framework также подпишется на Flux, возвращаемое getStreamSuspendCount, и именно эту подписку необходимо отменить, чтобы клиент SSE получил уведомление.

Теперь, как этого добиться? То, что вам нужно, это своего рода «клапан», который отменит свой источник при получении внешнего сигнала. Это то, что делает takeUntilOther(Publisher<?>).

Так что теперь вам нужен Publisher<?>, который вы можете привязать к жизненному циклу сеанса (точнее, к событию закрытия сеанса): как только он выдаст, takeUntilOther отменит свой источник.

2 варианта там:

  • событие закрытия сеанса предоставляется в API, похожем на слушатель: используйте Mono.create
  • вам действительно нужно вручную отменить отмену: используйте MonoProcessor.create(), и когда придет время, протолкните любое значение через него

Вот упрощенные примеры с составленными API для пояснения:

Создать

return theFluxForSSE.takeUntilOther(Mono.create(sink ->
    sessionEvent.registerListenerForClose(closeEvent -> sink.success(closeEvent))
));

MonoProcessor

MonoProcessor<String> processor = MonoProcessor.create();
beDisposeFlow.add(processor); // make it available to your session scope?
return theFluxForSSE.takeUntilOther(processor); //Spring will subscribe to this

Давайте смоделируем закрытие сессии с помощью запланированного задания:

Executors.newSingleThreadScheduledExecutor().schedule(() ->
    processor.onNext("STOP") // that's the key part: manually sending data through the processor to signal takeUntilOther
, 2, TimeUnit.SECONDS);

Вот пример смоделированного модульного теста, который вы можете запустить, чтобы лучше понять, что происходит:

@Test
public void simulation() {
    Flux<Long> theFluxForSSE = Flux.interval(Duration.ofMillis(100));

    MonoProcessor<String> processor = MonoProcessor.create();
    Executors.newSingleThreadScheduledExecutor().schedule(() -> processor.onNext("STOP"), 2, TimeUnit.SECONDS);

    theFluxForSSE.takeUntilOther(processor.log())
                 .log()
                 .blockLast();
}
...