Проблема
Я столкнулся с проблемой при использовании сторонней библиотеки, которая порождает новый поток и возвращает значения, используя этот новый поток.Он изменяет поток всей цепочки вниз по течению.
Код
Код будет примерно таким, он вызывает вариант использования и подписывается на определенный планировщик:
someUseCase()
.subscribeOn(Schedulers.io())
.subscribe(value -> print("value emitted: " + value))
и внутри варианта использования будет много операций, в том числе вызов сторонней библиотеки:
Completable.defer(() -> {
print("Operations on the chain before calling third party lib");
return Completable.complete();
})
.andThen(callThirdPartyLib())
.flatMap((Function<String, ObservableSource<String>>) s -> {
print("Operations on the chain after calling third party lib");
return Observable.just(s);
})
Текущий вывод
Журналы будут выглядеть так, обратите внимание, как потокизменения:
[Thread: RxCachedThreadScheduler-1] Operations on the chain before calling third party lib
[Thread: ThirdPartyLibThread] Operations on the chain after calling third party lib
[Thread: ThirdPartyLibThread] value emitted: third party value new thread
Ожидаемый вывод
Я бы хотел, чтобы поток продолжал работать в том потоке, на который он был первоначально подписан.
Ожидаемые ответы
Я знаю, что могу это исправить, добавив .observeOn(Schedulers.io())
после вызова третьей стороны lib.Но я не хочу жестко кодировать планировщик внутри класса UseCase.
Я хочу, чтобы пользователь класса выбирал, на какой планировщик он хочет подписаться, без побочных эффектов изменения потоков в середине.Я мог бы запросить планировщик в качестве параметра для класса UseCase, но я ищу другие альтернативы.
Дополнительный код
Так называется сторонняя библиотека lib, используя пример, приведенный в javadocs :
private static Observable<String> callThirdPartyLib() {
ThirdPartyLib thirdPartyLib = new ThirdPartyLib();
return Observable.create((ObservableOnSubscribe<String>) emitter -> thirdPartyLib.getData(emitter::onNext))
//.observeOn(Schedulers.io()) // <-- this fixes it
}
Это для имитации сторонней библиотеки, которая вызывает обратный вызов в другом потоке - этот код нельзя изменить:
private static class ThirdPartyLib {
void getData(Callback callback) {
new Thread(() -> callback.onDataReceived("third party value new thread"), "ThirdPartyLibThread").start();
}
interface Callback {
void onDataReceived(String data);
}
}