Существует ли простой способ реализовать что-то вроде этого
Зависит от вашей меры прямо. Вот как я бы это сделал.
Противодавление означает программную приостановку и возобновление в мире сопрограмм. Для onBackpressureDrop
нисходящий поток должен указать, что он готов к одному элементу, и приостановить его, в то время как восходящий поток никогда не должен ждать готовности нисходящего потока.
Вы должны использовать восходящий поток неограниченным образом и передать элементы и события терминала нисходящему потоку в ожидании этих сигналов.
package hu.akarnokd.kotlin.flow.impl
import hu.akarnokd.kotlin.flow.Resumable
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.AbstractFlow
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.collect
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference
@FlowPreview
internal class FlowOnBackpressureDrop<T>(private val source: Flow<T>)
: AbstractFlow<T>() {
@ExperimentalCoroutinesApi
@InternalCoroutinesApi
override suspend fun collectSafely(collector: FlowCollector<T>) {
coroutineScope {
val consumerReady = AtomicBoolean()
val producerReady = Resumable()
val value = AtomicReference<T>()
val done = AtomicBoolean()
val error = AtomicReference<Throwable>();
launch {
try {
source.collect {
if (consumerReady.get()) {
value.set(it);
consumerReady.set(false);
producerReady.resume();
}
}
done.set(true)
} catch (ex: Throwable) {
error.set(ex)
}
producerReady.resume()
}
while (true) {
consumerReady.set(true)
producerReady.await()
val d = done.get()
val ex = error.get()
val v = value.getAndSet(null)
if (ex != null) {
throw ex;
}
if (d) {
break;
}
collector.emit(v)
}
}
}
}
Примечание: Возобновляемая реализация.
Итак, давайте рассмотрим реализацию.
Во-первых, необходимо 5 переменных для передачи информации между коллектором восходящего потока и коллектором, работающим для нижестоящего потока: - consumerReady
указывает, что нисходящий поток готов к следующему элементу, - producerReady
указывает, что производитель сохранил следующий элемент (или сигнал терминала) и нисходящий поток могут возобновиться - value
готовый к употреблению элемент восходящего потока - done
конец восходящего потока - error
сбой восходящего потока
Далее мы должны запустить сборщик для восходящего потока, потому что сборка приостановлена и не позволит нижестоящему потребителю oop запускать до завершения. В этом сборщике мы проверяем, готов ли нисходящий потребитель (через consumerReady
) и, если это так, сохраняем текущий элемент, очищаем флаг готовности и сообщаем о его доступности через producerReady
. Очистка consumerReady
предотвратит сохранение последующих элементов восходящего потока до тех пор, пока сам нижний поток не покажет новую готовность.
Когда восходящий поток заканчивается или падает, мы устанавливаем переменные done
или error
и указываем производителя говорил.
После части launch { }
мы будем продолжать использовать общие переменные от имени нижестоящего коллектора.
Первое, что нужно сделать в каждом раунде, это указать, что мы готовы к следующему значению, затем дождаться сигнала стороны производителя, что он поместил следующее событие в общую переменную (и).
Далее мы собираем значения из этих переменных. Мы стремимся к полному завершению или кидаем ошибку, и только в крайнем случае пересылаем вышестоящий элемент в нижестоящий коллектор.