RxAndroid NEWBIE - Применение Schedulers.io для запуска runnables вручную - PullRequest
0 голосов
/ 11 сентября 2018

Как вручную запустить Runnables в Schedulers.io при выполнении асинхронного списка задач.

Все вызовы наших веб-сервисов будут делегированы в интерфейсе Runnable, поэтому все вызовы schedule поступили со всех нашихвеб-запросы.

В настоящее время мы пытаемся найти временное решение для использования RxAndroid во всех наших задачах webServices.

В качестве примера рассмотрим следующий фрагмент:

public class SchedulerIO implements Scheduler {

    private io.reactivex.Scheduler ioScheduler;
    private CompositeDisposable compositeDisposable;


    public SchedulerIO(@Named("IOSCHEDULER") io.reactivex.Scheduler ioScheduler) {
        this.ioScheduler = ioScheduler;
        this.compositeDisposable = new CompositeDisposable();
        this.ioScheduler.start();

    }

    @Override
    public void schedule(Runnable runnable) {
        schedule(runnable , null);
    }

    public void schedule(Runnable runnable  , Scheduler.Callback callback){
        this.compositeDisposable.add(execute(runnable , callback));

    }

    public Disposable execute(Runnable runnable , Scheduler.Callback callback){
        return Observable.just(runnable)
                .observeOn(ioScheduler)
                .doOnComplete(callback::onComplete)
                .subscribeOn(ioScheduler)
                .subscribe();
    }



    @Override
    public void cancel() {
        if(compositeDisposable != null && !compositeDisposable.isDisposed()){
            compositeDisposable.dispose();
        }
    }
}

К сведению: интерфейс Scheduler является частью существующего кода, мы избегаем значительных изменений кода, поскольку существующая кодовая база ExecutorScheduler имеет то же поведение, но другую реализацию.Ниже приведен фрагмент кода:

public class ExecutorScheduler implements Scheduler {

    private final ExecutorService executorService;

    public ExecutorScheduler(ExecutorService executorService) {
        this.executorService = executorService;
    }

    @Override
    public void schedule(Runnable runnable) {
        schedule(Arrays.asList(runnable), null);
    }

    @Override
    public void cancel() {


    }

    public void schedule(final List<Runnable> runnables, final Callback callback) {
        CompletableFuture<?>[] futureList = StreamSupport.stream(runnables)
                .map(task -> CompletableFuture.runAsync(task, executorService))
                .toArray(CompletableFuture[]::new);
        CompletableFuture.allOf(futureList).thenRun(() -> {
                    if (callback != null) callback.onComplete();
                }
        );
    }

}

Временно мы делаем это, чтобы избежать огромного изменения кода в существующем коде. Мы хотим заменить ExecutorService на RxAndroid Scheduler, используя Schedulers.io

Это правильная реализация для RxAndroid в решении асинхронных задач?

Имеет ли онлюбое влияние, так как эта новая реализация фокусируется только на io, не более и не менее.

...