Кэширование предметов от наблюдаемых до первой подписки для решения условий гонки - PullRequest
0 голосов
/ 17 января 2019

Я использую Reactive Extensions (RxJava 2) для выполнения вызова RPC на устройство Bluetooth, что приводит к входящему потоку данных, который я впоследствии анализирую, также используя Rx. Полученный API является простым Flowable<DownloadedRecord>. Для этого я использую API-интерфейс Rx библиотеки Sweetblue для Android .

.

Моя проблема в том, что существует условие состязания между «запросом» устройства на начало потоковой передачи и своевременной подпиской на поток, чтобы убедиться, что ни один пакет не пропущен.

Я использую Completable, чтобы сначала выполнить вызов RPC для запроса начала потоковой передачи данных, andThen( readRecords ). Кажется, что возникает состояние гонки, когда Sweetblue испускает некоторые пакеты, прежде чем readRecords успел подписаться на этот поток, тем самым «сломав» readRecords.

Чтобы абстрагироваться от этого конкретного сценария, возьмите следующий отдельный код:

val numbers = PublishSubject.create<Int>()

var currentTotal = 0
val sumToTen = numbers
    .doOnNext { currentTotal += it }
    .doOnNext { println( "Produced $it" ) }
    .takeUntil { currentTotal >= 10 }
    .doOnComplete { println( "Produced a total of $currentTotal." ) }

Completable.fromAction { numbers.onNext( 9 ) } ) // Mimic race condition.
    .andThen( sumToTen )
    .subscribe { println( "Observed: $it, Current total: $currentTotal" ) }

numbers.onNext( 1 )

Вызов numbers.onNext( 9 ) имитирует состояние гонки. sumToTen этот номер никогда не соблюдается, поскольку на sumToTen подписывается только следующая строка. Таким образом, поток никогда не завершается.

После некоторых исследований я понимаю, что могу «решить» эту проблему, используя replay и connect.

val numbers = PublishSubject.create<Int>()

var currentTotal = 0
val sumToTen = numbers
    .doOnNext { currentTotal += it }
    .doOnNext { println( "Produced $it" ) }
    .takeUntil { currentTotal >= 10 }
    .doOnComplete { println( "Produced a total of $currentTotal." ) }
    .replay( 1 ) // Always replay last item upon subscription.

Completable.fromAction { sumToTen.connect() }
    .andThen( Completable.fromAction { numbers.onNext( 9 ) } )
    .andThen( sumToTen )
    .subscribe { println( "Observed: $it, Current total: $currentTotal" ) }

numbers.onNext( 1 )

Теперь поток sumToTen завершается, поскольку при первом подключении к sumToThen до «начала потоковой передачи данных» (onNext( 9 )) этот поток подписывается на numbers, таким образом, возникают предполагаемые побочные эффекты (currentTotal). Но , '9' наблюдается только тогда, когда буфер replay достаточно большой (в данном случае это так). Например, замена replay( 1 ) на publish сделает поток завершенным («Всего произведено 10»), но не будет наблюдать «9».

Я не полностью удовлетворен этим решением по двум причинам:

  1. Это просто сводит к минимуму вероятность возникновения состояния гонки. Размер буфера replay может быть произвольным.
  2. Это всегда будет хранить указанное количество элементов в replay в памяти, даже если намерение состоит в том, чтобы делать это только до подписки.

Практически говоря, ни одна из этих проблем не является реальной проблемой, но с точки зрения удобства обслуживания это взлетает на глаз: код не дает четкого представления о намерениях .

Есть ли лучший способ справиться с этим сценарием? E.g.:

  • Оператор replay, который воспроизводит данные только для одного абонента (таким образом, удаляет кэш один раз в первый раз).
  • Совершенно иной подход, чем тот, который я исследовал здесь с publish/connect?
...