Наблюдаемый, не работающий в другом потоке - PullRequest
0 голосов
/ 01 января 2019

Я немного экспериментирую с RxJava.Вот мой код:

public static void main(String[] args) throws InterruptedException {
    List<String> l = new ArrayList<>();
    l.add("737");
    l.add("747");

    Observable.from(l)
        .doOnNext(
            a -> {
              System.out.println(
                  "1: "
                      + a
                      + ", Thread:"
                      + Thread.currentThread().getName()
                      + " Timestamp: "
                      + System.currentTimeMillis());
            })
        .flatMap(
            a -> {
              System.out.println(
                  "2: "
                      + a
                      + ", Thread:"
                      + Thread.currentThread().getName()
                      + " Timestamp: "
                      + System.currentTimeMillis());
              return Observable.just(a).subscribeOn(Schedulers.newThread());
            })
        .map(
            a -> {
              if (a.equals("737")) {
                try {
                  Thread.sleep(20000);
                } catch (InterruptedException e) {
                  e.printStackTrace();
                }
              }
              return a;
            })
        .map(
            a -> {
              System.out.println(
                  "3: "
                      + a
                      + ", Thread: "
                      + Thread.currentThread().getName()
                      + " Timestamp: "
                      + System.currentTimeMillis());
              return a.concat(" Boeing");
            })
        .reduce(
            new ArrayList<>(),
            (a, b) -> {
              System.out.println(
                  "4: "
                      + b
                      + ", Thread: "
                      + Thread.currentThread().getName()
                      + " Timestamp: "
                      + System.currentTimeMillis());
              a.add(b);
              return a;
            })
        .subscribe(
            a -> {
              System.out.println(
                  "4: "
                      + a
                      + ", Thread: "
                      + Thread.currentThread().getName()
                      + " Timestamp: "
                      + System.currentTimeMillis());
              System.out.println("Reduce result is: " + a);
            });

    Thread.sleep(500000);  //to prevent the main thread from exiting
  }

Вот вывод кода выше:

1: 737, Thread:main Timestamp: 1546369697211
2: 737, Thread:main Timestamp: 1546369697211
1: 747, Thread:main Timestamp: 1546369697235
2: 747, Thread:main Timestamp: 1546369697235
3: 737, Thread: RxNewThreadScheduler-1 Timestamp: 1546369717238
4: 737 Boeing, Thread: RxNewThreadScheduler-1 Timestamp: 1546369717238
3: 747, Thread: RxNewThreadScheduler-1 Timestamp: 1546369717239
4: 747 Boeing, Thread: RxNewThreadScheduler-1 Timestamp: 1546369717239
4: [737 Boeing, 747 Boeing], Thread: RxNewThreadScheduler-1 Timestamp: 1546369717239
Reduce result is: [737 Boeing, 747 Boeing]

Мой вопрос: работает ли 737 в потоке RxNewThreadScheduler-1 и спит ли этот поток дляЧерез 20 секунд поток 747 должен был продолжить выполнение, верно?Но в выводе мы можем видеть, что вывод, соответствующий '3' для 747, приходит позже, чем для 737. Кроме того, этот оператор вызывается в том же потоке, что и 737.

Точнее, почему«3» для 747 работает только после пробуждения потока для 737?Казалось бы, 747 пытается использовать те же потоки, что и 737. Насколько я понимаю, поток должен был выглядеть так:

747 испускается в потоке B и продолжает выполнение. Операции, соответствующие 747, должны были быть выполнены ранее в другом потоке.

Может кто-нибудь объяснить мне это поведение?

...