Вы можете сделать это, объединив однопоточный планировщик с оператором debounce:
class ManualExecutor : Executor {
private val tasks = ArrayDeque<Runnable>()
override fun execute(command: Runnable) = tasks.push(command)
fun runAllTasks() {
while (tasks.isNotEmpty()) {
tasks.pop().run()
}
}
}
val a: Observable<A>
val b: Observable<B>
val scheduler = Schedulers.from(ManualExecutor())
val aTransformed = a.observeOn(scheduler).map { transformA(it) }
val aCombinedWithB = combine(a, b).observeOn(scheduler)
val final = combine(aTransformed, aCombinedWithB).debounce(0)
// some time later....
emitA() // now all the updates are queued in our ManualExecutor
scheduler.runAllTasks() // final will only emit once, not twice!
Конечно, это не скомпилируется из коробки, и вам придется возиться с планировщиком и тестами чтобы понять это правильно, но, возможно, идея поможет. Если использование debounce с «нулевым» тайм-аутом кажется слишком хакерским, вы также можете использовать другую сигнатуру, которая обеспечивает полный контроль над периодом debounce с наблюдаемым значением.
Однако, если вы не используете его по назначению. в случае, указанном c только для вышеперечисленного, то вы можете упростить проблему с помощью чего-то подобного:
|-----------------------|
Stream A----->| map A to |
| pair(A, transform(A)) |
|-----------------------|
|
|----->|--------------------|
| Combine A, t(A) |
| and B (latest) |--> ?
| into 3-tuple |
Stream B---------------------->|--------------------|