Где я должен поставить onBackPressureBuffer (n) в цепочке подписки RxJava? - PullRequest
1 голос
/ 16 марта 2019

Я пытаюсь исправить существующую библиотеку React Native act-native-ble-plx , добавив onBackPressureBuffer () в существующий код Java .

Я знаюэто уродливо, но у меня нет времени, чтобы отправить PR прямо сейчас, и есть нерешенная проблема , которая может решить проблему.Я делаю это потому, что генератор событий работает на частоте 200 Гц.Мне нужен безопасный способ буферизации элементов на нативной стороне, пока они потребляются в своем собственном темпе на стороне JavaScript.

Таким образом, код выглядит следующим образом:

       final Subscription subscription = Observable.defer(new Func0<Observable<Observable<byte[]>>>() {
            @Override
            public Observable<Observable<byte[]>> call() {
                int properties = gattCharacteristic.getProperties();
                BluetoothGattDescriptor cccDescriptor = gattCharacteristic
                        .getDescriptor(Characteristic.CLIENT_CHARACTERISTIC_CONFIG_UUID);
                NotificationSetupMode setupMode = cccDescriptor != null ? NotificationSetupMode.QUICK_SETUP
                        : NotificationSetupMode.COMPAT;
                if ((properties & BluetoothGattCharacteristic.PROPERTY_NOTIFY) != 0) {
                    return connection.setupNotification(gattCharacteristic, setupMode);
                }

                if ((properties & BluetoothGattCharacteristic.PROPERTY_INDICATE) != 0) {
                    return connection.setupIndication(gattCharacteristic, setupMode);
                }

                return Observable.error(new CannotMonitorCharacteristicException(gattCharacteristic));
            }
        }).onBackpressureBuffer(1000)  <---- Here is my modification
.flatMap(new Func1<Observable<byte[]>, Observable<byte[]>>() {
            @Override
            public Observable<byte[]> call(Observable<byte[]> observable) {
                return observable;
            }
        }).doOnUnsubscribe(new Action0() {
            @Override
            public void call() {
                promise.resolve(null);
                transactions.removeSubscription(transactionId);
            }
        }).subscribe(new Observer<byte[]>() {
            @Override
            public void onCompleted() {
                promise.resolve(null);
                transactions.removeSubscription(transactionId);
            }

            @Override
            public void onError(Throwable e) {
                errorConverter.toError(e).reject(promise);
                transactions.removeSubscription(transactionId);
            }

            @Override
            public void onNext(byte[] bytes) {
                characteristic.logValue("Notification from", bytes);
                WritableArray jsResult = Arguments.createArray();
                jsResult.pushNull();
                jsResult.pushMap(characteristic.toJSObject(bytes));
                jsResult.pushString(transactionId);
                sendEvent(Event.ReadEvent, jsResult);
            }
        });

Моя проблемачто даже с этим дополнением у меня возникают исключения MissingBackPressure.

Я пробовал onBackPressureDrop (), и у меня точно такое же поведение.Поэтому я предполагаю, что делаю это неправильно, но не могу понять, почему прямо сейчас.

Любая помощь приветствуется.

1 Ответ

0 голосов
/ 21 марта 2019

Как вы уже сказали, вы столкнулись с проблемой с библиотекой react-native, и приведенный выше код ранее выдавал MissingBackpressureException.

Из Javadoc из .onBackpressureDrop() (выделение жирным шрифтом):

Указывает Наблюдателю, который испускает предметы быстрее, чем его наблюдатель может потреблять, чтобы выбросить, а не испустить те предметы, которые его наблюдатель не готов наблюдать.

Если количество запросов в нисходящем направлении достигает 0, то наблюдаемое будет воздерживаться от вызова {@code onNext}, пока наблюдатель снова не вызовет {@code request (n)} для увеличения количества запросов.

Противодавление:
Оператор распознает противодавление из нисходящего потока и использует источник {@code Observable} неограниченным образом (т. Е. Не применяя противодавление к нему).
Планировщик:
{@ code onBackpressureDrop} не работает по умолчанию для определенного {@link Scheduler}.

Вы можете видеть, что следующие операторы в.flatMap(), .doOnUnsubscribe() и .subscribe().

От Javadoc .flatMap() относительно противодавления:

Противодавление: Оператор учитывает противодавление от нисходящего потока.Внешний {@code Observable} потребляется в неограниченном режиме (т.е. к нему не применяется противодавление).Внутренние {@code Observable} ожидают противодавления;в случае нарушения оператор может сигнализировать {@code MissingBackpressureException}.

Javadoc .doOnUnsubscribe():

Противодавление: {@ code doOnUnsubscribe} не взаимодействует с запросами обратного давления или доставкой значений;Противодавление сохраняется между его восходящим и нисходящим потоком.

И .subscribe():

Противодавление: Операторпотребляет источник {@code Observable} неограниченно (т. е. к нему не применяется обратное давление).

Как вы можете видеть, ни один из операторов ниже .onBackpressure*() не применяет противодавлениев теме.Вам нужно будет добавить оператор, который делает это сразу после .onBackpressure*().Одним из таких операторов является .observeOn(Scheduler)

Javadoc .observeOn():

Обратное давление: Этот оператор учитывает обратное давление из нисходящего потока и ожидает его от источника {@ Code Observable},Нарушение этого ожидания приведет к {@code MissingBackpressureException}.Это самый распространенный оператор, где появляется исключение;ищите источники в цепочке, которые не поддерживают обратное давление, такие как {@code interval}, {@code timer}, {code PublishSubject} или {@code BehaviorSubject} и применяйте любой из операторов {@code onBackpressureXXX} до применения самого приложения {@code наблюдаем}.

Таким образом, работающий код может выглядеть следующим образом:

final Subscription subscription = Observable.defer(new Func0<Observable<Observable<byte[]>>>() {
    @Override
    public Observable<Observable<byte[]>> call() {
        int properties = gattCharacteristic.getProperties();
        BluetoothGattDescriptor cccDescriptor = gattCharacteristic
                .getDescriptor(Characteristic.CLIENT_CHARACTERISTIC_CONFIG_UUID);
        NotificationSetupMode setupMode = cccDescriptor != null ? NotificationSetupMode.QUICK_SETUP
                : NotificationSetupMode.COMPAT;
        if ((properties & BluetoothGattCharacteristic.PROPERTY_NOTIFY) != 0) {
            return connection.setupNotification(gattCharacteristic, setupMode);
        }

        if ((properties & BluetoothGattCharacteristic.PROPERTY_INDICATE) != 0) {
            return connection.setupIndication(gattCharacteristic, setupMode);
        }

        return Observable.error(new CannotMonitorCharacteristicException(gattCharacteristic));
    }
})
.flatMap(new Func1<Observable<byte[]>, Observable<byte[]>>() {
    @Override
    public Observable<byte[]> call(Observable<byte[]> observable) {
        return observable;
    }
})
.doOnUnsubscribe(new Action0() {
    @Override
    public void call() {
        promise.resolve(null);
        transactions.removeSubscription(transactionId);
    }
})
.onBackpressureBuffer(1000) // <---- Here is my modification
.observeOn(Schedulers.trampoline()) // <---- an operator that does backpressure the above
.subscribe(new Observer<byte[]>() {
    @Override
    public void onCompleted() {
        promise.resolve(null);
        transactions.removeSubscription(transactionId);
    }

    @Override
    public void onError(Throwable e) {
        errorConverter.toError(e).reject(promise);
        transactions.removeSubscription(transactionId);
    }

    @Override
    public void onNext(byte[] bytes) {
        characteristic.logValue("Notification from", bytes);
        WritableArray jsResult = Arguments.createArray();
        jsResult.pushNull();
        jsResult.pushMap(characteristic.toJSObject(bytes));
        jsResult.pushString(transactionId);
        sendEvent(Event.ReadEvent, jsResult);
    }
});
...