Когда я запускаю приведенный ниже код, я ожидаю, что оба подписчика получат свою собственную эластичную нить.Однако они не всегда делают это.Например, в моей системе, если я раскомментирую строку thread.Sleep(100)
, мои операторы печати указывают, что потребители получают свои данные в главном потоке.Если оставить комментарий, я увижу ожидаемый результат: каждый потребитель получает свои данные в своей эластичной нити.Почему я вижу это недетерминированное поведение?Как я злоупотребляю API?
List<FluxSink<String>> holder = new ArrayList<>();
ConnectableFlux<String> connect = Flux.
<String>create(holder::add).replay();
connect.connect();
Flux<String> flux = connect.subscribeOn(Schedulers.elastic());
FluxSink<String> sink = holder.get(0);
flux.subscribe(c -> {
System.out.println(Thread.currentThread().getName() + ", " +
"consumer 1 says " + c);
});
flux.subscribe(c -> {
System.out.println(Thread.currentThread().getName() + ", " +
"consumer 2 says " + c);
});
// When uncommented, subscribers receive on elastic threads; else,
// on the main thread, as if I had chosen schedulers.immediate()
//
// Thread.sleep(100);
sink.next("Hi!");
sink.complete();
Thread.sleep(1000);
//output with Thread.sleep(100):
// main, consumer 1 says Hi!
// main, consumer 2 says Hi!
//
//output without Thread.sleep(100):
// elastic-2, consumer 1 says Hi!
// elastic-3, consumer 2 says Hi!
Чего я хотел бы добиться, так это горячего потока с несколькими подписчиками и каждым подписчиком в отдельном потоке.