Мне нужно отобразить выбросы RxJava на основе побочного эффекта.
Каждая запись выбросов будет отображаться по следующей формуле: f(x) = const * x + counter
где счетчик равен 0
, когда константные числа четные или -1
когда const нечетное число.
Для каждого излучения счетчик будет увеличиваться.
И const, и излучения являются целыми числами и будут получены из удаленного ресурса.
Установка счетчика в качестве свойства класса, похоже, решает проблему.Но тогда побочный эффект сделает его не защищенным от потоков и не сможет вызываться более одного раза, пока мы не сбросим счетчик.
private var counter = 0
fun process(constSource: Single<Int>, entrySource: Flowable<Long>): Flowable<Long> {
fun mapper(const: Int, entry: Long) = Single.fromCallable {
if (const % 2 != 0) counter = -1
const * entry + counter++
}
return entrySource
.withLatestFrom(
constSource.toFlowable(),
BiFunction { entry: Long, const: Int -> entry to const }
)
.flatMapSingle { (entry, const) -> mapper(const, entry) }
}
@Test
fun `flat map with async side effect`() {
val constSource = Single.fromCallable { 2 }
val entrySource = Flowable.fromIterable((1L..5L))
process(constSource, entrySource)
.toList()
.test()
.await()
// f = const * entry + counter
// where counter starts from -1 when const is an odd number
.assertResult(listOf(2L, 5L, 8L, 11L, 14L))
process(constSource, entrySource)
.toList()
.test()
.await()
.assertResult(listOf(2L, 5L, 8L, 11L, 14L)) // will fail
}
Есть ли лучшее решение для этой проблемы?