Я использую Reactive Extensions (RxJava 2) для выполнения вызова RPC на устройство Bluetooth, что приводит к входящему потоку данных, который я впоследствии анализирую, также используя Rx. Полученный API является простым Flowable<DownloadedRecord>
. Для этого я использую API-интерфейс Rx библиотеки Sweetblue для Android .
.
Моя проблема в том, что существует условие состязания между «запросом» устройства на начало потоковой передачи и своевременной подпиской на поток, чтобы убедиться, что ни один пакет не пропущен.
Я использую Completable
, чтобы сначала выполнить вызов RPC для запроса начала потоковой передачи данных, andThen( readRecords )
. Кажется, что возникает состояние гонки, когда Sweetblue испускает некоторые пакеты, прежде чем readRecords
успел подписаться на этот поток, тем самым «сломав» readRecords
.
Чтобы абстрагироваться от этого конкретного сценария, возьмите следующий отдельный код:
val numbers = PublishSubject.create<Int>()
var currentTotal = 0
val sumToTen = numbers
.doOnNext { currentTotal += it }
.doOnNext { println( "Produced $it" ) }
.takeUntil { currentTotal >= 10 }
.doOnComplete { println( "Produced a total of $currentTotal." ) }
Completable.fromAction { numbers.onNext( 9 ) } ) // Mimic race condition.
.andThen( sumToTen )
.subscribe { println( "Observed: $it, Current total: $currentTotal" ) }
numbers.onNext( 1 )
Вызов numbers.onNext( 9 )
имитирует состояние гонки. sumToTen
этот номер никогда не соблюдается, поскольку на sumToTen
подписывается только следующая строка. Таким образом, поток никогда не завершается.
После некоторых исследований я понимаю, что могу «решить» эту проблему, используя replay
и connect
.
val numbers = PublishSubject.create<Int>()
var currentTotal = 0
val sumToTen = numbers
.doOnNext { currentTotal += it }
.doOnNext { println( "Produced $it" ) }
.takeUntil { currentTotal >= 10 }
.doOnComplete { println( "Produced a total of $currentTotal." ) }
.replay( 1 ) // Always replay last item upon subscription.
Completable.fromAction { sumToTen.connect() }
.andThen( Completable.fromAction { numbers.onNext( 9 ) } )
.andThen( sumToTen )
.subscribe { println( "Observed: $it, Current total: $currentTotal" ) }
numbers.onNext( 1 )
Теперь поток sumToTen
завершается, поскольку при первом подключении к sumToThen
до «начала потоковой передачи данных» (onNext( 9 )
) этот поток подписывается на numbers
, таким образом, возникают предполагаемые побочные эффекты (currentTotal
). Но , '9' наблюдается только тогда, когда буфер replay
достаточно большой (в данном случае это так). Например, замена replay( 1 )
на publish
сделает поток завершенным («Всего произведено 10»), но не будет наблюдать «9».
Я не полностью удовлетворен этим решением по двум причинам:
- Это просто сводит к минимуму вероятность возникновения состояния гонки. Размер буфера
replay
может быть произвольным.
- Это всегда будет хранить указанное количество элементов в
replay
в памяти, даже если намерение состоит в том, чтобы делать это только до подписки.
Практически говоря, ни одна из этих проблем не является реальной проблемой, но с точки зрения удобства обслуживания это взлетает на глаз: код не дает четкого представления о намерениях .
Есть ли лучший способ справиться с этим сценарием? E.g.:
- Оператор
replay
, который воспроизводит данные только для одного абонента (таким образом, удаляет кэш один раз в первый раз).
- Совершенно иной подход, чем тот, который я исследовал здесь с
publish/connect
?