Я перебираю с React core и пытаюсь создать поток * hot бесконечный с несколькими подписчиками:
@Test
public void hotStream() throws InterruptedException {
AtomicInteger counter = new AtomicInteger();
Flux<Object> flux = Flux
.create(fluxSink -> {
while (counter.incrementAndGet() <= 10) {
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException ignored) {
}
fluxSink.next(System.currentTimeMillis());
}
}).log()
.publish()
.autoConnect();
// first subscriber
new Thread(() -> {
log.info("[S] Subscribing first");
flux.log().subscribe(s -> log.info("[*] S1: {}", s));
}).start();
Thread.sleep(450);
// second subscriber
new Thread(() -> {
log.info("[S] Subscribing second");
flux.log().subscribe(s -> log.info("[*] S2: {}", s));
}).start();
}
Но когда второй подписчикподписался на поток, тогда процесс останавливается:
00:56:03.231 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
00:56:03.261 [Thread-0] INFO io.github.cepr0.demo.ReactiveTests - [S] Subscribing first
00:56:03.273 [Thread-0] INFO reactor.Flux.AutoConnect.2 - onSubscribe(FluxPublish.PublishInner)
00:56:03.279 [Thread-0] INFO reactor.Flux.AutoConnect.2 - request(unbounded)
00:56:03.286 [Thread-0] INFO reactor.Flux.Create.1 - onSubscribe(FluxCreate.BufferAsyncSink)
00:56:03.292 [Thread-0] INFO reactor.Flux.Create.1 - request(256)
00:56:03.403 [Thread-0] INFO reactor.Flux.Create.1 - onNext(1526939763402)
00:56:03.403 [Thread-0] INFO reactor.Flux.AutoConnect.2 - onNext(1526939763402)
00:56:03.403 [Thread-0] INFO io.github.cepr0.demo.ReactiveTests - [*] S1: 1526939763402
00:56:03.403 [Thread-0] INFO reactor.Flux.Create.1 - request(1)
00:56:03.504 [Thread-0] INFO reactor.Flux.Create.1 - onNext(1526939763504)
00:56:03.504 [Thread-0] INFO reactor.Flux.AutoConnect.2 - onNext(1526939763504)
00:56:03.504 [Thread-0] INFO io.github.cepr0.demo.ReactiveTests - [*] S1: 1526939763504
00:56:03.504 [Thread-0] INFO reactor.Flux.Create.1 - request(1)
00:56:03.605 [Thread-0] INFO reactor.Flux.Create.1 - onNext(1526939763605)
00:56:03.605 [Thread-0] INFO reactor.Flux.AutoConnect.2 - onNext(1526939763605)
00:56:03.605 [Thread-0] INFO io.github.cepr0.demo.ReactiveTests - [*] S1: 1526939763605
00:56:03.605 [Thread-0] INFO reactor.Flux.Create.1 - request(1)
00:56:03.706 [Thread-0] INFO reactor.Flux.Create.1 - onNext(1526939763706)
00:56:03.706 [Thread-0] INFO reactor.Flux.AutoConnect.2 - onNext(1526939763706)
00:56:03.707 [Thread-0] INFO io.github.cepr0.demo.ReactiveTests - [*] S1: 1526939763706
00:56:03.707 [Thread-0] INFO reactor.Flux.Create.1 - request(1)
00:56:03.713 [Thread-1] INFO io.github.cepr0.demo.ReactiveTests - [S] Subscribing second
00:56:03.714 [Thread-1] INFO reactor.Flux.AutoConnect.3 - onSubscribe(FluxPublish.PublishInner)
00:56:03.714 [Thread-1] INFO reactor.Flux.AutoConnect.3 - request(unbounded)
Я ожидал, что второй подписчик начнет читать поток вместе с первым подписчиком .Но этого не происходит.
Может кто-нибудь объяснить, почему это происходит и как это решить?
ОБНОВЛЕНО
Все, что мне нужно было сделатьдобавьте следующую строку в конец теста, чтобы предотвратить его остановку:
Thread.currentThread().join();