Как вы уже сказали, вы столкнулись с проблемой с библиотекой react-native
, и приведенный выше код ранее выдавал MissingBackpressureException
.
Из Javadoc из .onBackpressureDrop()
(выделение жирным шрифтом):
Указывает Наблюдателю, который испускает предметы быстрее, чем его наблюдатель может потреблять, чтобы выбросить, а не испустить те предметы, которые его наблюдатель не готов наблюдать.
Если количество запросов в нисходящем направлении достигает 0, то наблюдаемое будет воздерживаться от вызова {@code onNext}, пока наблюдатель снова не вызовет {@code request (n)} для увеличения количества запросов.
- Противодавление:
- Оператор распознает противодавление из нисходящего потока и использует источник {@code Observable} неограниченным образом (т. Е. Не применяя противодавление к нему).
- Планировщик:
- {@ code onBackpressureDrop} не работает по умолчанию для определенного {@link Scheduler}.
Вы можете видеть, что следующие операторы в.flatMap()
, .doOnUnsubscribe()
и .subscribe()
.
От Javadoc .flatMap()
относительно противодавления:
Противодавление: Оператор учитывает противодавление от нисходящего потока.Внешний {@code Observable} потребляется в неограниченном режиме (т.е. к нему не применяется противодавление).Внутренние {@code Observable} ожидают противодавления;в случае нарушения оператор может сигнализировать {@code MissingBackpressureException}.
Javadoc .doOnUnsubscribe()
:
Противодавление: {@ code doOnUnsubscribe} не взаимодействует с запросами обратного давления или доставкой значений;Противодавление сохраняется между его восходящим и нисходящим потоком.
И .subscribe()
:
Противодавление: Операторпотребляет источник {@code Observable} неограниченно (т. е. к нему не применяется обратное давление).
Как вы можете видеть, ни один из операторов ниже .onBackpressure*()
не применяет противодавлениев теме.Вам нужно будет добавить оператор, который делает это сразу после .onBackpressure*()
.Одним из таких операторов является .observeOn(Scheduler)
Javadoc .observeOn()
:
Обратное давление: Этот оператор учитывает обратное давление из нисходящего потока и ожидает его от источника {@ Code Observable},Нарушение этого ожидания приведет к {@code MissingBackpressureException}.Это самый распространенный оператор, где появляется исключение;ищите источники в цепочке, которые не поддерживают обратное давление, такие как {@code interval}, {@code timer}, {code PublishSubject} или {@code BehaviorSubject} и применяйте любой из операторов {@code onBackpressureXXX} до применения самого приложения {@code наблюдаем}.
Таким образом, работающий код может выглядеть следующим образом:
final Subscription subscription = Observable.defer(new Func0<Observable<Observable<byte[]>>>() {
@Override
public Observable<Observable<byte[]>> call() {
int properties = gattCharacteristic.getProperties();
BluetoothGattDescriptor cccDescriptor = gattCharacteristic
.getDescriptor(Characteristic.CLIENT_CHARACTERISTIC_CONFIG_UUID);
NotificationSetupMode setupMode = cccDescriptor != null ? NotificationSetupMode.QUICK_SETUP
: NotificationSetupMode.COMPAT;
if ((properties & BluetoothGattCharacteristic.PROPERTY_NOTIFY) != 0) {
return connection.setupNotification(gattCharacteristic, setupMode);
}
if ((properties & BluetoothGattCharacteristic.PROPERTY_INDICATE) != 0) {
return connection.setupIndication(gattCharacteristic, setupMode);
}
return Observable.error(new CannotMonitorCharacteristicException(gattCharacteristic));
}
})
.flatMap(new Func1<Observable<byte[]>, Observable<byte[]>>() {
@Override
public Observable<byte[]> call(Observable<byte[]> observable) {
return observable;
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
promise.resolve(null);
transactions.removeSubscription(transactionId);
}
})
.onBackpressureBuffer(1000) // <---- Here is my modification
.observeOn(Schedulers.trampoline()) // <---- an operator that does backpressure the above
.subscribe(new Observer<byte[]>() {
@Override
public void onCompleted() {
promise.resolve(null);
transactions.removeSubscription(transactionId);
}
@Override
public void onError(Throwable e) {
errorConverter.toError(e).reject(promise);
transactions.removeSubscription(transactionId);
}
@Override
public void onNext(byte[] bytes) {
characteristic.logValue("Notification from", bytes);
WritableArray jsResult = Arguments.createArray();
jsResult.pushNull();
jsResult.pushMap(characteristic.toJSObject(bytes));
jsResult.pushString(transactionId);
sendEvent(Event.ReadEvent, jsResult);
}
});