Я читал книгу по RxJava, и вот выдержка из нее:
Observable.create(s -> {
... async subscription and data emission ...
})
.doOnNext(i -> System.out.println(Thread.currentThread()))
.filter(i -> i % 2 == 0)
.map(i -> "Value " + i + " processed on " + Thread.currentThread())
.subscribe(s -> System.out.println("SOME VALUE =>" + s));
System.out.println("Will print BEFORE values are emitted")”
Он пишет, что поскольку операция асинхронная, значения будут передаваться в потоке, отличном от потока подписчика. Поскольку поток подписчика не блокируется, будет напечатано Will print BEFORE values are emitted
, прежде чем какое-либо значение будет отправлено подписчику.
Я не могу понять, что это за подписка? Могу ли я получить его имя как-нибудь. Чтобы проверить, что говорит автор, я написал код:
Observable.create(
subscriber -> {
System.out.println("Entering thread: " + Thread.currentThread().getName());
Thread t =
new Thread(
new Runnable() {
@Override
public void run() {
try {
System.out.println(
"Emitting threads: " + Thread.currentThread().getName());
Thread.sleep(5000);
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onNext(3);
subscriber.onCompleted();
} catch (InterruptedException e) {
subscriber.onError(e);
}
}
});
t.start();
})
.subscribe(
a -> {
System.out.print("Subscriber thread: " + Thread.currentThread().getName() + " ");
System.out.println(a);
});
System.out.println("Main thread exiting:");
Когда я запускаю приведенный выше код, я обнаруживаю, что поток подписчика совпадает с потоком, в котором вызывается onNext()
. Поскольку это асинхронная операция, элементы должны были отправляться в потоке, отличном от потока подписчика. Почему оба потока одинаковы?
Кроме того, даже после выхода из основного потока программа продолжает работать и завершается только после того, как все элементы будут нажаты с помощью onNext (). Почему это происходит? Почему программа не закрывается?