IO Thread checkout / стратегия выпуска RXJAVA - PullRequest
1 голос
/ 13 апреля 2019

Я использую RX-Java 1.3.8 и для демонстрации проблемы я изменил планировщик ввода-вывода CacheThreadScheduler, чтобы записывать в него оператор, например getting thread with $id, когда рабочий поток берется из очереди, и releasing thread with $id, когда рабочий поток возвращается очередь.

Теперь рассмотрим сценарий, в котором мы хотим сделать асинхронный HTTP-вызов, а затем, используя его результат, сделать еще один асинхронный HTTP-вызов.
Ниже код демонстрирует это
In the example have used executor service to simulate the async-io HTTP call

1. Observable.just("1")
2.          .doOnNext(s -> print("just started"))
3.          .flatMap(one -> {
4.              return Observable.create(sub -> {
5.                  // simulating an async io http request which take 10 sec
6.                  Executors.newSingleThreadExecutor().submit(() -> {
7.                      sleep(10_000);                      
8.                      print("GOT RESPONSE-1");
9.                      sub.onNext("");sub.onCompleted();
10.                 });
11.             });
12.         })
13.         .doOnNext((s) -> print("before moving to io thread"))
14.         .observeOn(Schedulers.io())
15.         .doOnNext((s) -> print("after moving to io thread"))
16.         .flatMap(two -> {
17.             return Observable.create(sub -> {
18.                 Executors.newSingleThreadExecutor().submit(() -> {
19.                         // simulating an async io http request which take 10 sec
20.                     sleep(10_000);
21.                     print("GOT RESPONSE-2");
22.                     sub.onNext("");sub.onCompleted();
23.                 });
24.             });
25.         })
26.         .doOnNext((s) -> print("close to end of chain"))            
27.         .subscribe();

ВЫХОД ПОЛУЧЕН:

1. getting thread worker with id ThreadWorker@1d251891
2. main:just started
3. pool-1-thread-1:GOT RESPONSE-1
4. pool-1-thread-1:before moving to io thread
5. RxCachedThreadScheduler-1:after moving to io thread
6. pool-2-thread-1:GOT RESPONSE-2
7. pool-2-thread-1:close to end of chain
8. releasing thread worker with id ThreadWorker@1d251891 in 2011

Теперь, если мы проанализируем вывод, первое, что он сделал, - это извлечение работника потока из CacheThreadScheduler, что нежелательно, так как мы ожидаем, что работа с потоками должна быть проверена после того, как GOT RESPONSE-1 будет напечатано.

Вопрос1 : Можно ли принудительно заставить нанимателя оформлять заказ только после печати GOT RESPONSE-1?

Второе, на что нужно обратить внимание, это line 8, потоковый работник освобождается после завершения конвейера, ожидается, что потоковый работник должен получить освобождение, как только Executors.newSingleThreadExecutor().submit получит вызов.

Question2 : Можно ли принудительно заставить этот обработчик получить освобождение после вызова Executors.newSingleThreadExecutor().submit.

...