rxJava2 Single.Just () всегда выполняется в основном потоке. Как заставить его исполниться в другом потоке? - PullRequest
1 голос
/ 04 июня 2019

в этом фрагменте кода я пытаюсь обработать кучу данных, но это не может быть в потоке пользовательского интерфейса, в противном случае возможен ANR. Я думал, что это легко сделать с помощью rxJava2, однако обработка данных всегда выполняется в основном потоке.

Загрузка данных запускается в «докладчике» следующим образом:

void loadHistoricalDataFromFile(String filename){
    view.showProgressDialog();
    addDisposable(
            model.loadHistoricalDataObservable(filename)
                    .subscribeOn(rxSchedulers.runOnBackground())
                    .observeOn(rxSchedulers.mainThread())
                    .subscribe(loadedSuccessfully -> {
                        view.hideProgressDialog();
                        if (loadedSuccessfully){
                            view.showSnackBar(R.string.simulator_loaded_data_success, LENGTH_SHORT);
                        } else {
                            view.showSnackBar(R.string.simulator_loaded_data_fail, LENGTH_INDEFINITE);
                        }
                    }));
}

Как видите, я использовал .subscribeOn(rxSchedulers.runOnBackground())

rxSchedulers.runOnBackground() реализован следующим образом:

public class AppRxSchedulers implements RxSchedulers {


    public static Executor backgroundExecutor = Executors.newCachedThreadPool();
    public static Scheduler BACKGROUND_SCHEDULERS = Schedulers.from(backgroundExecutor);
    public static Executor internetExecutor = Executors.newCachedThreadPool();
    public static Scheduler INTERNET_SCHEDULERS = Schedulers.from(internetExecutor);
    public static Executor singleExecutor = Executors.newSingleThreadExecutor();
    public static Scheduler SINGLE_SCHEDULERS = Schedulers.from(singleExecutor);

    @Override
    public Scheduler runOnBackground() {
        return BACKGROUND_SCHEDULERS;
    }

    @Override
    public Scheduler io() {
        return Schedulers.io();
    }

    @Override
    public Scheduler compute() {
        return Schedulers.computation();
    }

    @Override
    public Scheduler mainThread() {
        return AndroidSchedulers.mainThread();
    }

    @Override
    public Scheduler internet() {
        return INTERNET_SCHEDULERS;
    }

    @Override
    public Scheduler single() {
        return SINGLE_SCHEDULERS;
    }
}

Single.Just () реализован следующим образом

Single<Boolean> loadHistoricalDataObservable(String filename){
    return Single.just(loadHistoricalData(filename));
}

private Boolean loadHistoricalData(String filename){
    boolean successful = false;
    String json = FileUtils.readFileAsStringFromExtRam(filename);
    if (json.length() > 0) {
        Gson gson = new Gson();
        historicPriceList = null;
        historicPriceList = gson.fromJson(json, new TypeToken<List<HistoricPrice>>(){}.getType());
        successful = true;
        Timber.d("Successfully loaded file - recreated %d records", historicPriceList.size());
    } else {
        Timber.d("Failed to load file");
    }

    return successful;
}

Основная проблема заключается в том, что всякий раз, когда я достигаю точки останова в loadHistoricalData(), я вижу, что она запускается в основном потоке. Это обязательно должно быть в другом потоке. Как это возможно?

Ответы [ 2 ]

4 голосов
/ 04 июня 2019

Проблема здесь: Single.just (loadHistoricalData (filename));

Вы немедленно вызываете функцию, а затем передаете ее результат в Single.just ();Вам нужно изменить его на что-то вроде этого:

Single.fromCallable(new Callable<Boolean>() {
            @Override
            public Boolean call() throws Exception {
                return loadHistoricalData(filename);
            }
        });

Так это будет выглядеть так:

Single<Boolean> loadHistoricalDataObservable(String filename){
    return Single.fromCallable(new Callable<Boolean>() {
                @Override
                public Boolean call() throws Exception {
                    return loadHistoricalData(filename);
                }
            });
}
2 голосов
/ 04 июня 2019

Сам оператор Single.just всегда запускается из потока, из которого он вызывается.Вы должны использовать это для запуска ваших операций в другом потоке:

Single.create<Boolean> {
    val data = loadHistoricalData(*****)
    it.onSuccess(data)
}
...