Похоже, вы можете просто фильтровать на основе состояния Bluetooth.Если это общая атомная переменная:
final AtomicBoolean isConnected = new AtomicBoolean();
Observable.merge(
observeBluetooth()
.filter(v -> isConnected.get()),
observeServer()
.filter(v -> !isConnected.get())
)
// ... etc.
Если статус также можно наблюдать, он становится немного сложнее, так как вам нужен оператор valve
из проекта расширений:
Observable<Boolean> isConnectedSource = ...
Observable<Boolean> shared = isConnectedSource.publish().refCount(2);
Observable.merge(
observeBluetooth()
.compose(ObservableTransformers.valve(shared)),
observeServer()
.compose(ObservableTransformers.valve(shared.map(v -> !v), false))
)
// ... etc.
Обратите внимание, что valve
будет приостанавливать и продолжать буферизацию элементов, пока он закрыт.Если вам нужно отбросить предметы из другой последовательности, вам нужно объединить два подхода:
Observable<Boolean> isConnectedSource = ...
final AtomicBoolean isConnected = new AtomicBoolean();
Disposable status = isConnectedSource.subscribe(v -> isConnected.set(v));
Observable.merge(
observeBluetooth()
.filter(v -> isConnected.get()),
observeServer()
.filter(v -> !isConnected.get())
)
// ... etc.
status.dispose();