Ядро Reactor: бесконечный поток не работает с несколькими подписчиками - PullRequest
0 голосов
/ 22 мая 2018

Я перебираю с React core и пытаюсь создать поток * hot бесконечный с несколькими подписчиками:

@Test
public void hotStream() throws InterruptedException {

  AtomicInteger counter = new AtomicInteger();

  Flux<Object> flux = Flux
      .create(fluxSink -> {
        while (counter.incrementAndGet() <= 10) {
          try {
            TimeUnit.MILLISECONDS.sleep(100);
          } catch (InterruptedException ignored) {
          }
          fluxSink.next(System.currentTimeMillis());
        }
      }).log()
      .publish()
      .autoConnect();

  // first subscriber
  new Thread(() -> {
    log.info("[S] Subscribing first");
    flux.log().subscribe(s -> log.info("[*] S1: {}", s));
  }).start();

  Thread.sleep(450);

  // second subscriber
  new Thread(() -> {
    log.info("[S] Subscribing second");
    flux.log().subscribe(s -> log.info("[*] S2: {}", s));
  }).start();
}

Но когда второй подписчикподписался на поток, тогда процесс останавливается:

00:56:03.231 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
00:56:03.261 [Thread-0] INFO io.github.cepr0.demo.ReactiveTests - [S] Subscribing first
00:56:03.273 [Thread-0] INFO reactor.Flux.AutoConnect.2 - onSubscribe(FluxPublish.PublishInner)
00:56:03.279 [Thread-0] INFO reactor.Flux.AutoConnect.2 - request(unbounded)
00:56:03.286 [Thread-0] INFO reactor.Flux.Create.1 - onSubscribe(FluxCreate.BufferAsyncSink)
00:56:03.292 [Thread-0] INFO reactor.Flux.Create.1 - request(256)
00:56:03.403 [Thread-0] INFO reactor.Flux.Create.1 - onNext(1526939763402)
00:56:03.403 [Thread-0] INFO reactor.Flux.AutoConnect.2 - onNext(1526939763402)
00:56:03.403 [Thread-0] INFO io.github.cepr0.demo.ReactiveTests - [*] S1: 1526939763402
00:56:03.403 [Thread-0] INFO reactor.Flux.Create.1 - request(1)
00:56:03.504 [Thread-0] INFO reactor.Flux.Create.1 - onNext(1526939763504)
00:56:03.504 [Thread-0] INFO reactor.Flux.AutoConnect.2 - onNext(1526939763504)
00:56:03.504 [Thread-0] INFO io.github.cepr0.demo.ReactiveTests - [*] S1: 1526939763504
00:56:03.504 [Thread-0] INFO reactor.Flux.Create.1 - request(1)
00:56:03.605 [Thread-0] INFO reactor.Flux.Create.1 - onNext(1526939763605)
00:56:03.605 [Thread-0] INFO reactor.Flux.AutoConnect.2 - onNext(1526939763605)
00:56:03.605 [Thread-0] INFO io.github.cepr0.demo.ReactiveTests - [*] S1: 1526939763605
00:56:03.605 [Thread-0] INFO reactor.Flux.Create.1 - request(1)
00:56:03.706 [Thread-0] INFO reactor.Flux.Create.1 - onNext(1526939763706)
00:56:03.706 [Thread-0] INFO reactor.Flux.AutoConnect.2 - onNext(1526939763706)
00:56:03.707 [Thread-0] INFO io.github.cepr0.demo.ReactiveTests - [*] S1: 1526939763706
00:56:03.707 [Thread-0] INFO reactor.Flux.Create.1 - request(1)
00:56:03.713 [Thread-1] INFO io.github.cepr0.demo.ReactiveTests - [S] Subscribing second
00:56:03.714 [Thread-1] INFO reactor.Flux.AutoConnect.3 - onSubscribe(FluxPublish.PublishInner)
00:56:03.714 [Thread-1] INFO reactor.Flux.AutoConnect.3 - request(unbounded)

Я ожидал, что второй подписчик начнет читать поток вместе с первым подписчиком .Но этого не происходит.

Может кто-нибудь объяснить, почему это происходит и как это решить?

ОБНОВЛЕНО

Все, что мне нужно было сделатьдобавьте следующую строку в конец теста, чтобы предотвратить его остановку:

Thread.currentThread().join();
...