Реактор проекта: подписка () после того, как Flux.create () не использует поток планировщика - PullRequest
0 голосов
/ 23 октября 2018

Когда я запускаю приведенный ниже код, я ожидаю, что оба подписчика получат свою собственную эластичную нить.Однако они не всегда делают это.Например, в моей системе, если я раскомментирую строку thread.Sleep(100), мои операторы печати указывают, что потребители получают свои данные в главном потоке.Если оставить комментарий, я увижу ожидаемый результат: каждый потребитель получает свои данные в своей эластичной нити.Почему я вижу это недетерминированное поведение?Как я злоупотребляю API?

    List<FluxSink<String>> holder = new ArrayList<>();
    ConnectableFlux<String> connect = Flux. 
           <String>create(holder::add).replay();
    connect.connect();
    Flux<String> flux = connect.subscribeOn(Schedulers.elastic());
    FluxSink<String> sink = holder.get(0);

    flux.subscribe(c -> {
       System.out.println(Thread.currentThread().getName() + ", " + 
         "consumer 1 says " + c);
    });

    flux.subscribe(c -> {
        System.out.println(Thread.currentThread().getName() + ", " + 
          "consumer 2 says " + c);
    });
    // When uncommented, subscribers receive on elastic threads; else,
    // on the main thread, as if I had chosen schedulers.immediate()
    //
    // Thread.sleep(100);
    sink.next("Hi!");

    sink.complete();

    Thread.sleep(1000);

    //output with Thread.sleep(100):
    // main, consumer 1 says Hi!
    // main, consumer 2 says Hi!
    //
    //output without Thread.sleep(100):
    // elastic-2, consumer 1 says Hi!
    // elastic-3, consumer 2 says Hi!

Чего я хотел бы добиться, так это горячего потока с несколькими подписчиками и каждым подписчиком в отдельном потоке.

...