Как удалить источник при возникновении ошибки в zip-операторе в RxJava2? - PullRequest
0 голосов
/ 19 мая 2019

У меня есть несколько сенсорных входов от различных устройств, обернутых в Observables.Я хочу взять среднее значение этих значений датчика, поэтому оператор Zip кажется подходящим.Теперь проблема в том, что, когда соединение с одним из устройств не удается, я не хочу, чтобы все это просто разрывалось.Стандартное поведение состоит в том, что, если источник выдает ошибку, все источники прекратят работу так же, как и наблюдаемый из самого оператора zip.Есть две проблемы с этим стандартным поведением: 1) Когда источник выдает ошибку, другие источники прерываются, IE теряет соединение.Оператор defer должен затем восстановить это соединение, что занимает некоторое время.Я хочу перейти без проблем, когда устройство удаляется.2) Выполнение .onErrorResumeNext () на отказавшем источнике не вариант, так как для этого снова нужен источник.Выполнение Observable.just (0) .repeat () недопустимо, поскольку не принято делать особые исключения для значений 0 при взятии среднего значения.

Поведение, которое я хочу достичь, состоит в том, чтобы просто просто остановитьслушая наблюдаемое, которое дает ошибку, и просто продолжайте с теми, которые все еще работают.Кто-нибудь получил какие-либо идеи о том, как это сделать?

РЕДАКТИРОВАТЬ: мне удалось сохранить работу системы, но не так, как я хочу.Это проиллюстрировано следующим сегментом кода:

public FusedHeartRateSensor(HeartRateSensor... devices) {
    super("FusedHeartRateSensor", IoTType.TYPE_HEART_RATE_SENSOR, new FusedConnector(), devices);
    init();
}

private void init() {
    Observable.fromIterable(fusedDevices)
            .flatMap(hrs -> hrs.monitorConnection())
            .subscribeOn(Schedulers.newThread())
            .subscribe(
                    state -> {
                        //Als iets verandert met de staat van de geconnecteerde toestellen moet
                        //de deviceChangeObservable op de hoogte gebracht worden
                        refreshConnectedDevices();
                    },
                    throwable -> {
                        Log.e(TAG, throwable.getLocalizedMessage());
                        throwable.printStackTrace();
                    }
            );
}

private void refreshConnectedDevices() {
    Log.i(TAG, "Refreshing enabled fused devices");
    List<HeartRateSensor> devices = fusedDevices.stream()
            .filter(Connectable::isConnected)
            .collect(Collectors.toList());

    deviceChangeObservable.onNext(devices);
}

private Observable<Integer> getMeanObservable(List<Observable<List<Integer>>> observables) {
    System.out.println();
    return Observable.zip(observables, objects -> getMean(Arrays.stream(objects).map(o -> (List<Integer>) o).filter(l -> !l.isEmpty()).map(this::getMean).collect(Collectors.toList())));
}

И немного дальше, когда запрашивается мониторинг ...

@Override
public Observable<Integer> monitorHeartRate() {
    ReplaySubject<Integer> resultObservable = ReplaySubject.create();
    CompositeDisposable disposable = new CompositeDisposable();

    deviceChangeObservable.subscribe(
                    sensors -> {
                        Log.i(TAG, "Switched to new sources");
                        getMeanObservable(sensors.stream()
                                .map(s -> s.monitorHeartRate()
                                        .buffer(1, TimeUnit.SECONDS))
                                .collect(Collectors.toList()))
                                .subscribe(
                                        t -> {
                                            resultObservable.onNext(t);
                                            System.out.println();
                                        },
                                        throwable -> {
                                            throwable.printStackTrace();
                                            System.out.println();

                                        }
                                );
                    },
                    throwable -> {
                        Log.e(TAG, throwable.getLocalizedMessage());
                        throwable.printStackTrace();
                        System.out.println();
                    }
            );

    return resultObservable;
}

Проблема заключается в том, что при сбое датчика воператор zip, все другие наблюдаемые в zip stop.Мое текущее решение состоит в том, чтобы повторить соединение и установить новый zip-файл, наблюдаемый только с наблюдаемыми устройствами, которые все еще подключены.Проблема в том, что это не без проблем.Я также попытался сделать это с отсрочкой и повтором, но это имеет тот же результат, что и код выше.

...