Я использую RX-Java 1.3.8
и для демонстрации проблемы я изменил планировщик ввода-вывода CacheThreadScheduler
, чтобы записывать в него оператор, например getting thread with $id
, когда рабочий поток берется из очереди, и releasing thread with $id
, когда рабочий поток возвращается очередь.
Теперь рассмотрим сценарий, в котором мы хотим сделать асинхронный HTTP-вызов, а затем, используя его результат, сделать еще один асинхронный HTTP-вызов.
Ниже код демонстрирует это
In the example have used executor service to simulate the async-io HTTP call
1. Observable.just("1")
2. .doOnNext(s -> print("just started"))
3. .flatMap(one -> {
4. return Observable.create(sub -> {
5. // simulating an async io http request which take 10 sec
6. Executors.newSingleThreadExecutor().submit(() -> {
7. sleep(10_000);
8. print("GOT RESPONSE-1");
9. sub.onNext("");sub.onCompleted();
10. });
11. });
12. })
13. .doOnNext((s) -> print("before moving to io thread"))
14. .observeOn(Schedulers.io())
15. .doOnNext((s) -> print("after moving to io thread"))
16. .flatMap(two -> {
17. return Observable.create(sub -> {
18. Executors.newSingleThreadExecutor().submit(() -> {
19. // simulating an async io http request which take 10 sec
20. sleep(10_000);
21. print("GOT RESPONSE-2");
22. sub.onNext("");sub.onCompleted();
23. });
24. });
25. })
26. .doOnNext((s) -> print("close to end of chain"))
27. .subscribe();
ВЫХОД ПОЛУЧЕН:
1. getting thread worker with id ThreadWorker@1d251891
2. main:just started
3. pool-1-thread-1:GOT RESPONSE-1
4. pool-1-thread-1:before moving to io thread
5. RxCachedThreadScheduler-1:after moving to io thread
6. pool-2-thread-1:GOT RESPONSE-2
7. pool-2-thread-1:close to end of chain
8. releasing thread worker with id ThreadWorker@1d251891 in 2011
Теперь, если мы проанализируем вывод, первое, что он сделал, - это извлечение работника потока из CacheThreadScheduler, что нежелательно, так как мы ожидаем, что работа с потоками должна быть проверена после того, как GOT RESPONSE-1
будет напечатано.
Вопрос1 : Можно ли принудительно заставить нанимателя оформлять заказ только после печати GOT RESPONSE-1
?
Второе, на что нужно обратить внимание, это line 8
, потоковый работник освобождается после завершения конвейера, ожидается, что потоковый работник должен получить освобождение, как только Executors.newSingleThreadExecutor().submit
получит вызов.
Question2 : Можно ли принудительно заставить этот обработчик получить освобождение после вызова Executors.newSingleThreadExecutor().submit
.