Я хочу разветвить логику в зависимости от количества выбросов из восходящего потока.
Если быть точным, я хочу:
- Ничего не произойдет, если в восходящем направлении пусто
- Одна ветвь для запуска, когда восходящий испускает только одно значение, а затем завершает
- Одна ветвь для запуска, когда восходящий испускает более одного значения, а затемзавершает.
Я ломал голову над тем, как подойти к этому, и я придумал что-то, что работает, но кажется ужасно многословным.Мне интересно, есть ли более простой способ сделать это.
Это решение основано на операторе valve из проекта RxJava2Extensions.
Схема решения следующая:
- Используйте
publish(foo)
, чтобы подписаться несколько раз на восходящий поток - Используйте
merge
для двух ветвей логики - Для «более одной логики излучения» используйтесначала закройте
valve
и откройте его при втором выбросе, сломайте клапан, если не было или только один выброс.Под ломанием клапана я имею в виду прекращение управления Publisher
- Для «только одной логики эмиссии» используйте изначально закрытое
valve
.Используйте ambArray
, чтобы либо сломать клапан при отсутствии или повторном выбросе, либо открыть клапан, когда был ровно один выброс.
Так что, похоже, это работает, хотя мои опасения таковы:
- Смотрится на то, что делает.Может ли это быть закодировано проще и понятнее?
- Весь бизнес по взлому клапанов вызовет исключение, которое я просто глотаю, но могут быть и другие исключения, не связанные с клапанами, которые я, вероятно, должен различить здесь и позволить им распространяться вниз попоток.[РЕДАКТИРОВАТЬ] Разрыв клапана имеет важное значение, так что клапан для логики единичных выбросов не накапливает выбросы, предназначенные для логики множественных выбросов, и не забирает память таким образом [/ EDIT]
Вот код:
Flowable.just(1,2,3,4,5) // +1 emissions
//Flowable.just(1) // 1 emission
//Flowable.empty() // 0 emissions
.publish( //publish so that you get connectableFlowable inside
f ->
Flowable.merge( //merge for the logic split
f.compose(
valve(f.scan(0, (sum, i) -> sum + 1) //scan to emit progressive count
.filter(i -> i > 1) //filter for when count > 1
.take(1) //take just first such count
.concatMap(__ -> Flowable.<Boolean>never().startWith(true)) //and open the valve
.switchIfEmpty(Flowable.empty()), //break the valve if there was just 1 element
false) //start with the valve closed
)
.onErrorResumeNext(Flowable.empty()) //swallow the broken valve exception???
.map(__ -> "more than one elements!"), //here goes logic for +1 emissions
f.compose(
valve(
Flowable.ambArray(
f.scan(0, (sum, i) -> sum + 1) //do progressive counts
.switchIfEmpty(Flowable.never()) //if there was no elements then never end this guy
.filter(i -> i > 1) //filter > 1
.take(1) //take just first one
.concatMap(
__ -> Flowable.<Boolean>empty()) //if there was > 1 element then emit empty and break the valve so we
//don't accumulate byte arrays that are meant for multipart upload
,
f.count() //count the stream
.map(c -> c == 1) //open valve if the count was 1
.toFlowable()
.concatWith(Flowable.never()) //and keep the stream opened forever
),
false
)
)
.onErrorResumeNext(Flowable.empty())
.map(i -> "just one element") //here goes logic for just one emission
)
)
.doOnNext(i -> System.out.println("haya! " + i))
.blockingSubscribe();
}