Что означает «подписная нить» в RxJava - PullRequest
0 голосов
/ 09 января 2019

Я читал книгу по RxJava, и вот выдержка из нее:

Observable.create(s -> {
   ... async subscription and data emission ...
})
.doOnNext(i -> System.out.println(Thread.currentThread()))
.filter(i -> i % 2 == 0)
.map(i -> "Value " + i + " processed on " + Thread.currentThread())
.subscribe(s -> System.out.println("SOME VALUE =>" + s));
System.out.println("Will print BEFORE values are emitted")”

Он пишет, что поскольку операция асинхронная, значения будут передаваться в потоке, отличном от потока подписчика. Поскольку поток подписчика не блокируется, будет напечатано Will print BEFORE values are emitted, прежде чем какое-либо значение будет отправлено подписчику.

Я не могу понять, что это за подписка? Могу ли я получить его имя как-нибудь. Чтобы проверить, что говорит автор, я написал код:

Observable.create(
            subscriber -> {
              System.out.println("Entering thread: " + Thread.currentThread().getName());
              Thread t =
                  new Thread(
                      new Runnable() {
                        @Override
                        public void run() {
                          try {
                            System.out.println(
                                "Emitting threads: " + Thread.currentThread().getName());
                            Thread.sleep(5000);
                            subscriber.onNext(1);
                            subscriber.onNext(2);
                            subscriber.onNext(3);
                            subscriber.onCompleted();
                          } catch (InterruptedException e) {
                            subscriber.onError(e);
                          }
                        }
                      });
              t.start();
            })
        .subscribe(
            a -> {
              System.out.print("Subscriber thread: " + Thread.currentThread().getName() + " ");
              System.out.println(a);
            });
    System.out.println("Main thread exiting:");

Когда я запускаю приведенный выше код, я обнаруживаю, что поток подписчика совпадает с потоком, в котором вызывается onNext(). Поскольку это асинхронная операция, элементы должны были отправляться в потоке, отличном от потока подписчика. Почему оба потока одинаковы?

Кроме того, даже после выхода из основного потока программа продолжает работать и завершается только после того, как все элементы будут нажаты с помощью onNext (). Почему это происходит? Почему программа не закрывается?

Ответы [ 2 ]

0 голосов
/ 10 января 2019

Поток подписчика - это поток, который вызывает subscribe. Поскольку поток RxJava представляет собой цепочку таких вызовов подписки, эти вызовы могут быть перемещены в другие потоки в последовательности с помощью subscribeOn.

Например:

Observable.fromCallable(() -> Thread.currentThread())
.subscribe(System.out::println);

System.out.println("Subscriber thread: " + Thread.currentThread());

Это напечатает поток подписчика дважды. Альтернативно

Observable.fromCallable(() -> Thread.currentThread())
.subscribeOn(Schedulers.single())
.subscribe(System.out::println);

System.out.println("Subscriber thread: " + Thread.currentThread());

Thread.sleep(1000);

Это напечатает поток подписчика и поток RxSingleScheduler.

0 голосов
/ 09 января 2019

По умолчанию Rx является однопоточным, что подразумевает, что Observable и цепочка операторов, которые мы можем применить к нему, будут уведомлять своих наблюдателей в том же потоке, в котором вызывается его метод subscribe().

...