Как избежать внешних звонков, чтобы изменить поток потока - PullRequest
4 голосов
/ 22 марта 2019

Проблема

Я столкнулся с проблемой при использовании сторонней библиотеки, которая порождает новый поток и возвращает значения, используя этот новый поток.Он изменяет поток всей цепочки вниз по течению.

Код

Код будет примерно таким, он вызывает вариант использования и подписывается на определенный планировщик:

someUseCase()
  .subscribeOn(Schedulers.io())
  .subscribe(value -> print("value emitted: " + value))

и внутри варианта использования будет много операций, в том числе вызов сторонней библиотеки:

Completable.defer(() -> {
    print("Operations on the chain before calling third party lib");
    return Completable.complete();
})
.andThen(callThirdPartyLib())
.flatMap((Function<String, ObservableSource<String>>) s -> {
    print("Operations on the chain after calling third party lib");
    return Observable.just(s);
})

Текущий вывод

Журналы будут выглядеть так, обратите внимание, как потокизменения:

[Thread: RxCachedThreadScheduler-1] Operations on the chain before calling third party lib
[Thread: ThirdPartyLibThread] Operations on the chain after calling third party lib
[Thread: ThirdPartyLibThread] value emitted: third party value new thread

Ожидаемый вывод

Я бы хотел, чтобы поток продолжал работать в том потоке, на который он был первоначально подписан.

Ожидаемые ответы

Я знаю, что могу это исправить, добавив .observeOn(Schedulers.io()) после вызова третьей стороны lib.Но я не хочу жестко кодировать планировщик внутри класса UseCase.

Я хочу, чтобы пользователь класса выбирал, на какой планировщик он хочет подписаться, без побочных эффектов изменения потоков в середине.Я мог бы запросить планировщик в качестве параметра для класса UseCase, но я ищу другие альтернативы.

Дополнительный код

Так называется сторонняя библиотека lib, используя пример, приведенный в javadocs :

private static Observable<String> callThirdPartyLib() {
    ThirdPartyLib thirdPartyLib = new ThirdPartyLib();
    return Observable.create((ObservableOnSubscribe<String>) emitter -> thirdPartyLib.getData(emitter::onNext))
            //.observeOn(Schedulers.io()) // <-- this fixes it
}

Это для имитации сторонней библиотеки, которая вызывает обратный вызов в другом потоке - этот код нельзя изменить:

private static class ThirdPartyLib {

    void getData(Callback callback) {
        new Thread(() -> callback.onDataReceived("third party value new thread"), "ThirdPartyLibThread").start();
    }

    interface Callback {
        void onDataReceived(String data);
    }
}
...