RxJava / RxKotlin: CombinLatest, который уже завершается, если один источник завершает (не все) - PullRequest
0 голосов
/ 05 апреля 2019

В основном у меня есть два Flowables F и G, и я хочу использовать combineLatest на них, но я хочу, чтобы объединенный Flowable уже завершился, если F завершен (даже если G все еще бег).

Вот пример того, чего я хочу достичь с помощью уродливого решения:

fun combineFandGbutTerminateIfFTerminates(F: Flowable<Int>, G: Flowable<Int>) : Flowable<Pair<Int, Int>> {
  val _F = F.share()
  val _G = G.takeUntil(_F.ignoreElements().toFlowable<Nothing>())
  val FandG = Flowables.combineLatest(_F, _G)
  return FandG
}

Мы можем извлечь это из функции и расширения:

fun<T> Flowable<T>.completeWith(other: Flowable<*>) : Flowable<T> {
    return takeUntil(other.ignoreElements().toFlowable<Nothing>())
}

Есть ли лучший способ выразить это?

1 Ответ

0 голосов
/ 08 апреля 2019

Я пришел к следующему решению. Это позволяет объединить одного мастера со многими подчиненными источниками. Если мастер завершает работу, объединенный Flowable завершается. Однако, если ведомый завершает работу перед мастером, распространяется ошибка SlaveCompletedPrematurelyError.

class SlaveCompletedPrematurelyError(message: String) : Throwable(message)

/**
 * Combine this Flowable with one slave source.
 */
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
fun <T, T1, R> Flowable<T>.combineLatestSlaves(
  slaveSource: Flowable<T1>,
  combineFunction: (T, T1) -> R
): Flowable<R> = combineLatestSlaves(Functions.toFunction(combineFunction), slaveSource)

/**
 * Combine this Flowable with two slave sources.
 */
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
fun <T, T1, T2, R> Flowable<T>.combineLatestSlaves(
  slaveSource1: Flowable<T1>,
  slaveSource2: Flowable<T2>,
  combineFunction: (T, T1, T2) -> R
) =
  combineLatestSlaves(Functions.toFunction(combineFunction), slaveSource1, slaveSource2)

/**
 * Combine this Flowable with three slave sources.
 */
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
fun <T, T1, T2, T3, R> Flowable<T>.combineLatestSlaves(
  slaveSource1: Flowable<T1>,
  slaveSource2: Flowable<T2>,
  slaveSource3: Flowable<T3>,
  combineFunction: (T, T1, T2, T3) -> R
) =
  combineLatestSlaves(Functions.toFunction(combineFunction), slaveSource1, slaveSource2, slaveSource3)

/**
 * Combine this Flowable with many slave sources.
 */
@SchedulerSupport(SchedulerSupport.NONE)
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
fun <T : U, U, R> Flowable<T>.combineLatestSlaves(
  combiner: Function<in Array<Any>, out R>,
  vararg slaveSources: Publisher<out U>
): Flowable<R> =
  combineLatestSlaves(slaveSources, combiner, bufferSize())

/**
 * Combine this Flowable with many slave sources.
 *
 * This function is identical of using combineLatest with this and the slave sources except with the following changes:
 * - If this Flowable completes, the resulting Flowable completes even if the slave sources are still running.
 * - If a slave source completes before this Flowable, a SlaveCompletedPrematurelyError error is triggered.
 */
@SchedulerSupport(SchedulerSupport.NONE)
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
fun <T : U, U, R> Flowable<T>.combineLatestSlaves(
  slaveSources: Array<out Publisher<out U>>,
  combiner: Function<in Array<Any>, out R>,
  bufferSize: Int
): Flowable<R> {
  val masterCompleted = Throwable()

  val sources = Array<Publisher<out U>>(slaveSources.size + 1) {
    when (it) {
      0 -> Flowable.error<U>(masterCompleted).startWith(this)
      else -> Flowable.error<U> { SlaveCompletedPrematurelyError(slaveSources[it - 1].toString()) }.startWith(
        slaveSources[it - 1]
      )
    }
  }

  return combineLatest(sources, combiner, bufferSize).onErrorComplete { it == masterCompleted }
}

/**
 * Errors encountered in the stream for which the provided `predicate` returns true will be silently turned into graceful completion.
 */
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
inline fun <T> Flowable<T>.onErrorComplete(crossinline predicate: (Throwable) -> Boolean): Flowable<T> =
  onErrorResumeNext { error: Throwable ->
    if (predicate(error)) Flowable.empty<T>() else Flowable.error<T>(
      error
    )
  }
...