Логика ветвления rxjava2, основанная на количестве выбросов - PullRequest
0 голосов
/ 19 декабря 2018

Я хочу разветвить логику в зависимости от количества выбросов из восходящего потока.

Если быть точным, я хочу:

  1. Ничего не произойдет, если в восходящем направлении пусто
  2. Одна ветвь для запуска, когда восходящий испускает только одно значение, а затем завершает
  3. Одна ветвь для запуска, когда восходящий испускает более одного значения, а затемзавершает.

Я ломал голову над тем, как подойти к этому, и я придумал что-то, что работает, но кажется ужасно многословным.Мне интересно, есть ли более простой способ сделать это.

Это решение основано на операторе valve из проекта RxJava2Extensions.

Схема решения следующая:

  1. Используйте publish(foo), чтобы подписаться несколько раз на восходящий поток
  2. Используйте merge для двух ветвей логики
  3. Для «более одной логики излучения» используйтесначала закройте valve и откройте его при втором выбросе, сломайте клапан, если не было или только один выброс.Под ломанием клапана я имею в виду прекращение управления Publisher
  4. Для «только одной логики эмиссии» используйте изначально закрытое valve.Используйте ambArray, чтобы либо сломать клапан при отсутствии или повторном выбросе, либо открыть клапан, когда был ровно один выброс.

Так что, похоже, это работает, хотя мои опасения таковы:

  1. Смотрится на то, что делает.Может ли это быть закодировано проще и понятнее?
  2. Весь бизнес по взлому клапанов вызовет исключение, которое я просто глотаю, но могут быть и другие исключения, не связанные с клапанами, которые я, вероятно, должен различить здесь и позволить им распространяться вниз попоток.[РЕДАКТИРОВАТЬ] Разрыв клапана имеет важное значение, так что клапан для логики единичных выбросов не накапливает выбросы, предназначенные для логики множественных выбросов, и не забирает память таким образом [/ EDIT]

Вот код:

Flowable.just(1,2,3,4,5) // +1 emissions
    //Flowable.just(1) // 1 emission
    //Flowable.empty() // 0 emissions
            .publish( //publish so that you get connectableFlowable inside
                f ->
                    Flowable.merge( //merge for the logic split
                        f.compose(
                            valve(f.scan(0, (sum, i) -> sum + 1) //scan to emit progressive count
                                   .filter(i -> i > 1) //filter for when count > 1
                                   .take(1) //take just first such count
                                   .concatMap(__ -> Flowable.<Boolean>never().startWith(true))  //and open the valve
                                   .switchIfEmpty(Flowable.empty()), //break the valve if there was just 1 element
                                  false) //start with the valve closed
                        )
                         .onErrorResumeNext(Flowable.empty()) //swallow the broken valve exception???
                         .map(__ -> "more than one elements!"), //here goes logic for +1 emissions
                        f.compose(
                            valve(
                                Flowable.ambArray(
                                    f.scan(0, (sum, i) -> sum + 1) //do progressive counts
                                     .switchIfEmpty(Flowable.never()) //if there was no elements then never end this guy
                                     .filter(i -> i > 1) //filter > 1
                                     .take(1) //take just first one
                                     .concatMap(
                                         __ -> Flowable.<Boolean>empty()) //if there was > 1 element then emit empty and break the valve so we
                                                                          //don't accumulate byte arrays that are meant for multipart upload
                                    ,
                                    f.count() //count the stream
                                     .map(c -> c == 1) //open valve if the count was 1
                                     .toFlowable()
                                     .concatWith(Flowable.never()) //and keep the stream opened forever
                                ),
                                false
                            )
                        )
                         .onErrorResumeNext(Flowable.empty())
                         .map(i -> "just one element") //here goes logic for just one emission
                    )
            )
            .doOnNext(i -> System.out.println("haya! " + i))
            .blockingSubscribe();
}

1 Ответ

0 голосов
/ 20 декабря 2018

Как я и подозревал, я сделал это слишком сложным.Я пришел с таким способом чище и проще решение проблемы:

 public static <U, D> FlowableTransformer<U, D> singleMultipleBranching(
    FlowableTransformer<U, D> singleBranchTransformer,
    FlowableTransformer<U, D> manyBranchTransformer
)
{
    return
        fl ->
            fl.replay( //replay so that you get connectableFlowable inside
                       f -> f.buffer(2)
                             .take(1)
                             .switchMap(
                                 buf -> {
                                     switch (buf.size()) {
                                     case 1:
                                         return f.compose(
                                             singleBranchTransformer);
                                     case 2:
                                         return f.compose(
                                             manyBranchTransformer);
                                     default:
                                         return Flowable.empty();
                                     }
                                 }
                             )
            );

}
...