Динамическое переключение между наблюдаемыми в RxJava - PullRequest
0 голосов
/ 14 марта 2019

У меня есть две наблюдаемые: одна излучает ByteArrays, полученные от соединения Bluetooth, другая - результаты периодических запросов к серверу.

fun observeBluetooth():Observable<ByteArray>
fun observeServer():Observable<ByteArray>

Доступно состояние соединения Bluetooth (в виде логической переменной или в виде наблюдаемого)

Мне нужно объединить наблюдаемые с учетом следующих требований:

  1. Если Bluetooth подключен, данные, полученные от applyBluetooth (), должны передаваться (и запросы к серверу не должны выполняться)
  2. Если Bluetooth не подключен, данные, полученные от наблюдающего сервера () должен быть выпущен
  3. Если соединение Bluetooth восстановлено, данные, полученные от applyBluetooth (), должны быть отправлены снова

Как мне это сделать с помощью RxJava / Kotlin?

Ответы [ 2 ]

3 голосов
/ 14 марта 2019

Если у вас есть наблюдаемое isBtConnected, вы можете переключить его:

val isBtConnected: Observable<Boolean> = ...
isBtConnected.switchMap {
    if (it) observeBluetooth()
    else observeServer()
}
0 голосов
/ 14 марта 2019

Похоже, вы можете просто фильтровать на основе состояния Bluetooth.Если это общая атомная переменная:

final AtomicBoolean isConnected = new AtomicBoolean();

Observable.merge(
    observeBluetooth()
       .filter(v -> isConnected.get()),
    observeServer()
       .filter(v -> !isConnected.get())
)
// ... etc.

Если статус также можно наблюдать, он становится немного сложнее, так как вам нужен оператор valve из проекта расширений:

Observable<Boolean> isConnectedSource = ...

Observable<Boolean> shared = isConnectedSource.publish().refCount(2);

Observable.merge(
    observeBluetooth()
       .compose(ObservableTransformers.valve(shared)),
    observeServer()
       .compose(ObservableTransformers.valve(shared.map(v -> !v), false))
)
// ... etc.

Обратите внимание, что valve будет приостанавливать и продолжать буферизацию элементов, пока он закрыт.Если вам нужно отбросить предметы из другой последовательности, вам нужно объединить два подхода:

Observable<Boolean> isConnectedSource = ...

final AtomicBoolean isConnected = new AtomicBoolean();

Disposable status = isConnectedSource.subscribe(v -> isConnected.set(v));

Observable.merge(
    observeBluetooth()
       .filter(v -> isConnected.get()),
    observeServer()
       .filter(v -> !isConnected.get())
)
// ... etc.

status.dispose();
...