Плоская карта с побочным эффектом - PullRequest
0 голосов
/ 25 декабря 2018

Мне нужно отобразить выбросы RxJava на основе побочного эффекта.

Каждая запись выбросов будет отображаться по следующей формуле: f(x) = const * x + counter где счетчик равен 0, когда константные числа четные или -1 когда const нечетное число.

Для каждого излучения счетчик будет увеличиваться.

И const, и излучения являются целыми числами и будут получены из удаленного ресурса.

Установка счетчика в качестве свойства класса, похоже, решает проблему.Но тогда побочный эффект сделает его не защищенным от потоков и не сможет вызываться более одного раза, пока мы не сбросим счетчик.

private var counter = 0

fun process(constSource: Single<Int>, entrySource: Flowable<Long>): Flowable<Long> {
  fun mapper(const: Int, entry: Long) = Single.fromCallable {
    if (const % 2 != 0) counter = -1
    const * entry + counter++
  }
  return entrySource
    .withLatestFrom(
      constSource.toFlowable(),
      BiFunction { entry: Long, const: Int -> entry to const }
    )
    .flatMapSingle { (entry, const) -> mapper(const, entry) }
}

@Test
fun `flat map with async side effect`() {
  val constSource = Single.fromCallable { 2 }
  val entrySource = Flowable.fromIterable((1L..5L))

  process(constSource, entrySource)
    .toList()
    .test()
    .await()
    // f = const * entry + counter
    // where counter starts from -1 when const is an odd number
    .assertResult(listOf(2L, 5L, 8L, 11L, 14L))

  process(constSource, entrySource)
    .toList()
    .test()
    .await()
    .assertResult(listOf(2L, 5L, 8L, 11L, 14L)) // will fail
}

Есть ли лучшее решение для этой проблемы?

1 Ответ

0 голосов
/ 26 декабря 2018

Хотя вы можете заблокировать какой-либо произвольный объект во время чтения / записи этого счетчика, вы можете использовать более простой и функциональный подход.

Наивное решение заключается в использовании

Observables.combineLatest(
    Observable.range(0, Int.MAX_VALUE), 
    source
) { index, item -> const * item - (index and 1) }

...при условии, что вы используете RxKotlin .(Это упрощает лямбду, и нам не нужно добавлять SAM-преобразование .)

, но это не сработает, потому что Observable.range будет OOM, так как он сразу попытаетсяи выдает все значения от 0 до MAX_VALUE, поэтому вместо этого мы хотим испускать только по требованию, поэтому мы можем использовать Flowable вместо этого, и он должен работать просто отлично.

Flowables.combineLatest(
    Flowable.range(0, Int.MAX_VALUE), 
    source.toFlowable(BackPressureStrategy.BUFFER)
) { index, item -> const * item - (index and 1) }

Игнорироватьнеясный побитовый оператор, я просто добавляю его здесь для краткости и потому что я не знаю, для чего предназначен ваш код, или как бы я назвал этот метод, который возвращает -1 для нечетного числа, делайте все, что наиболее читабельно.

...