Блокировка операции записи RxAndroidBle - PullRequest
0 голосов
/ 19 февраля 2019

Как выполнить блокировку записи в Android с помощью RxAndroidBle.Только если операция записи прошла успешно, должна быть выполнена следующая команда.

protected void doWriteBytes(UUID characteristic, byte[] bytes) {
    final Disposable disposable = connectionObservable
            .flatMapSingle(rxBleConnection -> rxBleConnection.writeCharacteristic(characteristic, bytes))
            .observeOn(AndroidSchedulers.mainThread())
            .retry(BT_RETRY_TIMES_ON_ERROR)
            .subscribe(
                    value -> {
                        Timber.d("Write characteristic %s: %s",
                                BluetoothGattUuid.prettyPrint(characteristic),
                                byteInHex(value));
                    },
                    throwable -> onError(throwable)
            );

    compositeDisposable.add(disposable);
}


protected void test() {
  // blocking write bytes
  doWriteBytes(UUID.fromString("f433bd80-75b8-11e2-97d9-0002a5d5c51b"), new byte[] {0x35, 0x12});

  // following command should be only performed if doWriteBytes is successful executed
  foo();

  // blocking write bytes
  doWriteBytes(UUID.fromString("f433bd80-75b8-11e2-97d9-0002a5d5c51b"), new byte[] {0x5, 0x6, 0x1});

  bar();
}

Я знаю, что подписаться и onComplete, но также возможно обойтись без этих методов?

Фон я хочупереопределить метод теста в нескольких разных подклассах, чтобы я мог выполнять различные команды doWriteBytes (например, команды ACK) для отправки некоторых байтов на устройство Bluetooth, но я должен быть уверен, что следующая команда будет выполнена только в том случае, если команда ACK успешнаsend.

Возможно, это скорее проблема RxJava2, но я не совсем с ней знаком.

Редактировать:

Спасибо за ваш ответ @Dariusz Seweryn.Извините, мой вопрос, вероятно, был не совсем ясен.Я попытаюсь конкретизировать его.

Я хочу написать исходный код как обычную функцию в test () для абстрагирования реализаций RxJava2.Единственное отличие состоит в том, что doWriteBytes и другие операции Bluetooth (уведомление, чтение) должны выполняться через RxAndroidBle.То, что мне нужно записать на устройство Bluetooth, зависит от байтов уведомлений или какого-либо другого алгоритма в методе test ().Кроме того, я хочу перезаписать метод test () для реализации другого потока связи Bluetooth для совершенно другого устройства Bluetooth.Всегда важно, чтобы операции Bluetooth обрабатывались последовательно.

Теперь у меня есть три идеи:

1) Моя первая идея - реализовать блокировку всех операций RxAndroidBle, поэтому я могу использовать простыенапример, циклы.

2) Моя вторая идея - динамически добавлять (concat?) во время выполнения наблюдения к другому в методе test (), который последовательно обрабатывает, но мне всегда нужны возвращаемые значения из предыдущих наблюдений?

3) Моя третья идея - объединить операцию записи / уведомления / записи как метод, который я могу вызвать в методе test ().Операция должна записать байты в признак A, затем дождаться уведомления о характеристике B, выполнить некоторую обработку с полученными байтами и снова записать в характеристику C. Но что записывается или как процесс уведомления должен динамически выполняться во время выполнения теста() добавлен метод.

Может быть, есть изящное решение для моей проблемы в RxJava2 или это вообще невозможно?

Edit2:

Я пытался реализовать все три идеи, но, к сожалению, у меня ничего не получилось.

1)

connectionObservable
                .flatMapSingle(rxBleConnection -> rxBleConnection.writeCharacteristic(characteristic, bytes))
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .retry(BT_RETRY_TIMES_ON_ERROR)
                .blockingSubscribe(
                        value -> {
                            Timber.d("Write characteristic %s: %s",
                                    BluetoothGattUuid.prettyPrint(characteristic),
                                    byteInHex(value));
                            processBtQueue();
                        },
                        throwable -> onError(throwable)
                );

Это всегда блокирует даже при успехе?Я должен выпустить это где-нибудь?Кроме того, метод возвращает void, а не одноразовый, но тогда я не могу его утилизировать.

2) Я борюсь с этой идеей.С какой наблюдаемой мне следует согласиться, если я не знаю начальную наблюдаемую?ConnectionObserable не работает, потому что он содержит RxBleConnection.Вторая проблема заключается в том, что значения после операции Bluetooth являются классами Java Object !?Я должен бросить это каждый раз?У вас есть пример того, как я могу связать, например, операцию записи Bluetooth в результат уведомления Bluetooth?

3) Проблема этой идеи в том, что я не знаю, как динамически добавлять во время выполнения обрабатывающую часть к уведомлению вне части подписки RxJava?

У меня есть рабочее решение для идеи № 3

protected Observable<byte[]> doWriteNotify(UUID characteristic, byte[] bytes, Observable<byte[]> notificationObservable) {
    Observable observable = connectionObservable
            .flatMapSingle(rxBleConnection -> rxBleConnection.writeCharacteristic(characteristic, bytes))
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .retry(BT_RETRY_TIMES_ON_ERROR)
        .flatMap( writeBytes -> notificationObservable)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .retry(BT_RETRY_TIMES_ON_ERROR);

    compositeDisposable.add(observable.subscribe());

    return observable;
}

Кстати.я должен создать отдельные потоки в stackoverflow с этими вопросами?

Если это поможет, вы можете найти мой экспериментальный исходный код здесь .

1 Ответ

0 голосов
/ 19 февраля 2019

Я знаю, подписаться и onComplete, но также можно обойтись без этих методов?

Дано:

Completable foo() { ... }

Completable bar() { ... )

Можно сделать:

Disposable testDisposable = connectionObservable
                                .flatMapCompletable(connection ->
                                    connection.writeCharacteristic(UUID.fromString("f433bd80-75b8-11e2-97d9-0002a5d5c51b"), new byte[] {0x35, 0x12}).ignoreElement()
                                        .andThen(foo())
                                        .andThen(connection.writeCharacteristic(UUID.fromString("f433bd80-75b8-11e2-97d9-0002a5d5c51b"), new byte[] {0x5, 0x6, 0x1}).ignoreElement())
                                        .andThen(bar())
                                )
                                .subscribe(
                                     () -> { /* completed */ },
                                     throwable -> { /* error happened */ }
                                )

Закрыв вышеупомянутое с одним .subscribe(), можно контролировать, сколько соединений завершит поток.В приведенном выше примере, если соединение преждевременно завершится во время первой записи - все последующие операции (foo(), запись, bar()) вообще не будут выполняться.

Редактировать:

Все ваши идеи могут сработать - вы можете их опробовать.

Может быть, есть изящное решение моей проблемы в RxJava2 или оно вообще невозможно?

Для классов Observable / Single / Completable есть .blocking*() функции, если они вам действительно нужны.Однако будьте осторожны - вы можете столкнуться с некоторыми трудными для отладки проблемами в зависимости от вашей реализации из-за введения большего состояния в ваше приложение.

...