Публикация на реакторе Асинхронный поток и блокировка потоков - PullRequest
0 голосов
/ 10 октября 2018

Я пытаюсь создать сценарий, который состоит из следующих шагов:

  1. (на Init) Flux издатель создал
  2. (на Init) Подписчики subscribe
  3. (от действия пользователя) издатель начинает потоковую передачу / публикацию событий
  4. Подписчик веб-контроллера получает и кэширует последние BUFFER_SIZE события

На основе http://projectreactor.io/docs/core/release/reference/#advanced-parallelizing-parralelflux и https://www.baeldung.com/reactor-core Я пытаюсь использовать для этого create и publish, и проблема, с которой я сталкиваюсь, заключается в том, что поток, вызывающий flux.connect, находится внутри цикла while внутрииздатель.

Вот минимальный рабочий пример использования spring-boot-starter-webflux:

private ConnectableFlux<Integer> flux;
private Scheduler scheduler;
private int nextRead = 0;

private static final int BUFFERSIZE = 100;
private List<Integer> sink = new LinkedList<Integer>() ;

@PostConstruct
public void Init() {
    this.scheduler = Schedulers.newSingle("Streamer");

    flux = Flux.<Integer>create(fluxSink -> {
        while (true) {
            fluxSink.next(nextRead++);
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }).publishOn(scheduler).publish();
}

@GetMapping("/subscribe")
public void subscribe(){
    this.flux.subscribeOn(scheduler,false).subscribe(new CoreSubscriber<Integer>() {
        @Override
        public Context currentContext() {
            return null;
        }

        @Override
        public void onSubscribe(Subscription subscription) {
            subscription.request(Long.MAX_VALUE);
        }

        @Override
        public void onNext(Integer e) {
            while (sink.size() >= BUFFERSIZE) sink.remove(0);
            sink.add(e);
            logger.debug("sink event: " + e);
        }

        @Override
        public void onError(Throwable t) {}

        @Override
        public void onComplete() {}
    });
}

@GetMapping("/start")
public void startStream(){
    logger.debug("EventStreamSimulator startStream before connect");
    this.flux.connect();
    logger.debug("EventStreamSimulator startStream after connect");
}

@GetMapping("/values")
public Flux<Integer> getEvents(){
    return Flux.fromIterable(sink);
}

На основе этого кода веб-запрос на /start начнет потоковую передачу, но httpнить заклинивает в эмиттере бесконечный цикл.запросы на /values и ведение журнала показывают, что все работает нормально (но исходные http request до /start никогда не завершаются / не возвращаются)

Образцы журналов:

2018-10-09 18: 12: 54.798 DEBUG 6024 --- [ctor-http-nio-2] com.example.FluxPocController: emmit event: 0

2018-10-09 18: 12: 54.798 DEBUG 6024--- [Streamer-1] com.example.FluxPocController: событие приемника: 0

Тогда , вот вопрос: поддерживается ли директива publishOn для этих асинхронных операций?способ использования Flux.create?если да, то как его использовать?

...