Получение предметов, излучаемых из наблюдаемой - PullRequest
0 голосов
/ 21 ноября 2018

Вот мой код:

  public static void main(String[] args) {
    Observable.just("747", "737", "777")
        .flatMap(
            a -> {
              try {
                Thread.sleep(5000);
              } catch (InterruptedException e) {
                e.printStackTrace();
              }
              return Observable.just(a).subscribeOn(Schedulers.newThread());
            })
        .subscribe(p -> System.out.println("Received " + p + " on thread " + Thread.currentThread().getName()));
  }

Как я понял, каждый из элементов наблюдаемого должен будет выполняться в отдельном потоке (что происходит), и результаты будут отправляться в тот жепоток, который сделал работу (это происходит также).Но что я не могу понять, так это то, что поток main не завершает работу и не ожидает завершения фоновых потоков?Программа продолжается до тех пор, пока работает каждый из фоновых потоков.

1 Ответ

0 голосов
/ 22 ноября 2018

Если вы посмотрите на дамп потока, вы увидите, что поток main фактически застрял в операторе sleep.Вот почему он не завершается.

Это потому, что именно поток выполняет оператор flatMap, поэтому он застревает.По этой же причине запуск кода занимает много времени.Вы можете легко проверить это, вставив оператор печати непосредственно перед sleep:

try {
  System.out.println(Thread.currentThread().getName() + " is sleeping");
  Thread.sleep(5000);
} catch (InterruptedException e) {
  e.printStackTrace();
}

Вывод выглядит примерно так:

main is sleeping
main is sleeping
Received 747 on thread RxNewThreadScheduler-1
main is sleeping
Received 737 on thread RxNewThreadScheduler-2
Received 777 on thread RxNewThreadScheduler-3

Я думаю, что вы хотели написать что-тонапример:

System.out.println(Thread.currentThread().getName()  + " is creating the observable");
Observable.just("747", "737", "777")
        .flatMap(a ->
                 Observable.fromCallable(() -> {
                  try {
                    System.out.println(Thread.currentThread().getName() + " is sleeping");
                    Thread.sleep(5000);
                  } catch (InterruptedException e) {
                    e.printStackTrace();
                  }
                  return a;
                }).subscribeOn(Schedulers.newThread())
      ).subscribe(p -> System.out.println("Received " + p + " on thread " + Thread.currentThread().getName()));
System.out.println(Thread.currentThread().getName() + " is going to exit");

Вывод:

main is creating the observable
main is going to exit
RxNewThreadScheduler-3 is sleeping
RxNewThreadScheduler-2 is sleeping
RxNewThreadScheduler-1 is sleeping
Received 777 on thread RxNewThreadScheduler-3
Received 747 on thread RxNewThreadScheduler-1
Received 737 on thread RxNewThreadScheduler-1

В этой версии main завершает работу сразу после создания Observable.

...