Я немного экспериментирую с RxJava.Вот мой код:
public static void main(String[] args) throws InterruptedException {
List<String> l = new ArrayList<>();
l.add("737");
l.add("747");
Observable.from(l)
.doOnNext(
a -> {
System.out.println(
"1: "
+ a
+ ", Thread:"
+ Thread.currentThread().getName()
+ " Timestamp: "
+ System.currentTimeMillis());
})
.flatMap(
a -> {
System.out.println(
"2: "
+ a
+ ", Thread:"
+ Thread.currentThread().getName()
+ " Timestamp: "
+ System.currentTimeMillis());
return Observable.just(a).subscribeOn(Schedulers.newThread());
})
.map(
a -> {
if (a.equals("737")) {
try {
Thread.sleep(20000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return a;
})
.map(
a -> {
System.out.println(
"3: "
+ a
+ ", Thread: "
+ Thread.currentThread().getName()
+ " Timestamp: "
+ System.currentTimeMillis());
return a.concat(" Boeing");
})
.reduce(
new ArrayList<>(),
(a, b) -> {
System.out.println(
"4: "
+ b
+ ", Thread: "
+ Thread.currentThread().getName()
+ " Timestamp: "
+ System.currentTimeMillis());
a.add(b);
return a;
})
.subscribe(
a -> {
System.out.println(
"4: "
+ a
+ ", Thread: "
+ Thread.currentThread().getName()
+ " Timestamp: "
+ System.currentTimeMillis());
System.out.println("Reduce result is: " + a);
});
Thread.sleep(500000); //to prevent the main thread from exiting
}
Вот вывод кода выше:
1: 737, Thread:main Timestamp: 1546369697211
2: 737, Thread:main Timestamp: 1546369697211
1: 747, Thread:main Timestamp: 1546369697235
2: 747, Thread:main Timestamp: 1546369697235
3: 737, Thread: RxNewThreadScheduler-1 Timestamp: 1546369717238
4: 737 Boeing, Thread: RxNewThreadScheduler-1 Timestamp: 1546369717238
3: 747, Thread: RxNewThreadScheduler-1 Timestamp: 1546369717239
4: 747 Boeing, Thread: RxNewThreadScheduler-1 Timestamp: 1546369717239
4: [737 Boeing, 747 Boeing], Thread: RxNewThreadScheduler-1 Timestamp: 1546369717239
Reduce result is: [737 Boeing, 747 Boeing]
Мой вопрос: работает ли 737 в потоке RxNewThreadScheduler-1
и спит ли этот поток дляЧерез 20 секунд поток 747 должен был продолжить выполнение, верно?Но в выводе мы можем видеть, что вывод, соответствующий '3' для 747, приходит позже, чем для 737. Кроме того, этот оператор вызывается в том же потоке, что и 737.
Точнее, почему«3» для 747 работает только после пробуждения потока для 737?Казалось бы, 747 пытается использовать те же потоки, что и 737. Насколько я понимаю, поток должен был выглядеть так:
747 испускается в потоке B и продолжает выполнение. Операции, соответствующие 747, должны были быть выполнены ранее в другом потоке.
Может кто-нибудь объяснить мне это поведение?