Передача списка в Rx Java Concat и при удалении элемента в onNext или onError вызывает ConcurrentModificationException - PullRequest
0 голосов
/ 21 апреля 2020

У меня есть этот код, который я могу использовать с concat, но мне нужен способ передачи Iterator, чтобы я мог добавлять или удалять элементы, которые невозможны с этим кодом.

Я хочу иметь возможность удалять и добавлять элементы в concat, не вызывая java .util.ConcurrentModificationException Мне нужно сделать это для другой цели, например, если некоторые элементы успешно выполнены, удалите их и повторите попытку других. Или, если токен недействителен, удалите все и обновите sh токен и добавьте все с измененным токеном.

Observable userObservable = getUserObservable(token);
Observable userObservable = getMerchantObservable(token);

List<Observable<?>> observables = Arrays.asList(userObservable, merchantObservable);

Observable.concat(observables)
        .doOnNext(
                result -> observables.remove(0)
                //then if token refreshed call is success then
                //observables.clear()
                //observables.add(updatedUserObservable)
                //observables.add(updateMerchantObservable)
        )
        //.doOnError(error -> System.out.println(error.getMessage()))
        .doOnError(error -> {
            System.out.println(error.getMessage());
            if(((HttpException) error).code() == 401){
                observables.clear();
                observables.add(getRefreshTokenObservable());
            }
        })
        .retryWhen(new RetryWithDelay(5, 2000))
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(
                result -> System.out.println(result),
                error -> System.out.println("onError called!"));
...