Я пытаюсь создать сценарий, который состоит из следующих шагов:
- (на
Init
) Flux
издатель создал - (на
Init
) Подписчики subscribe
- (от действия пользователя) издатель начинает потоковую передачу / публикацию событий
- Подписчик веб-контроллера получает и кэширует последние
BUFFER_SIZE
события
На основе http://projectreactor.io/docs/core/release/reference/#advanced-parallelizing-parralelflux и https://www.baeldung.com/reactor-core Я пытаюсь использовать для этого create
и publish
, и проблема, с которой я сталкиваюсь, заключается в том, что поток, вызывающий flux.connect
, находится внутри цикла while внутрииздатель.
Вот минимальный рабочий пример использования spring-boot-starter-webflux
:
private ConnectableFlux<Integer> flux;
private Scheduler scheduler;
private int nextRead = 0;
private static final int BUFFERSIZE = 100;
private List<Integer> sink = new LinkedList<Integer>() ;
@PostConstruct
public void Init() {
this.scheduler = Schedulers.newSingle("Streamer");
flux = Flux.<Integer>create(fluxSink -> {
while (true) {
fluxSink.next(nextRead++);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).publishOn(scheduler).publish();
}
@GetMapping("/subscribe")
public void subscribe(){
this.flux.subscribeOn(scheduler,false).subscribe(new CoreSubscriber<Integer>() {
@Override
public Context currentContext() {
return null;
}
@Override
public void onSubscribe(Subscription subscription) {
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer e) {
while (sink.size() >= BUFFERSIZE) sink.remove(0);
sink.add(e);
logger.debug("sink event: " + e);
}
@Override
public void onError(Throwable t) {}
@Override
public void onComplete() {}
});
}
@GetMapping("/start")
public void startStream(){
logger.debug("EventStreamSimulator startStream before connect");
this.flux.connect();
logger.debug("EventStreamSimulator startStream after connect");
}
@GetMapping("/values")
public Flux<Integer> getEvents(){
return Flux.fromIterable(sink);
}
На основе этого кода веб-запрос на /start
начнет потоковую передачу, но http
нить заклинивает в эмиттере бесконечный цикл.запросы на /values
и ведение журнала показывают, что все работает нормально (но исходные http request
до /start
никогда не завершаются / не возвращаются)
Образцы журналов:
2018-10-09 18: 12: 54.798 DEBUG 6024 --- [ctor-http-nio-2] com.example.FluxPocController: emmit event: 0
2018-10-09 18: 12: 54.798 DEBUG 6024--- [Streamer-1] com.example.FluxPocController: событие приемника: 0
Тогда , вот вопрос: поддерживается ли директива publishOn
для этих асинхронных операций?способ использования Flux.create
?если да, то как его использовать?