Kotlin Flow onBackpressureDrop RxJava2 аналог - PullRequest
5 голосов
/ 25 января 2020

В Rx Java 2 Текучие существуют различные стратегии противодавления, среди которых наиболее интересными являются:

  • ПОСЛЕДНИЕ
  • БУФЕР
  • DROP

, которые соблюдаются во всей цепочке Rx.

В Kotlin есть Flow, который заявляет, что он поддерживает обратное давление "из коробки". Мне удалось настроить Flow на использование стратегий BUFFER и LATEST, используя следующее:

Для BUFFER:

observeFlow()
    .buffer(10)
    .collect { ... }

С LATEST:

observeFlow()
    .conflate()
    .collect { ... }

Что просто ярлык на тот же оператор буфера.

Но я не смог найти ничего, что могло бы работать так же, как DROP. Короче говоря, DROP удалит любое значение, которое приходит в поток, когда предыдущее значение еще не было обработано. А с Flow я даже не уверен, что это вообще возможно.

Рассматривая случай:

observeFlow()
    .backpressureDrop() // non-existent operator, just for illustrative purposes
    .map { ... }
    .flatMapMerge { ... }
    .collect { ... }

Так что backpressureDrop должен уважать любую работу, которая выполняется ниже в потоке, пока этот оператор ничего не знаю о том, что происходит ниже (без явного обратного вызова снизу - как метод request в Rx Java Subscriber). Поэтому кажется, что это невозможно. И этот оператор не должен проходить через какое-либо событие до того, как был собран предыдущий элемент.

Есть ли какой-нибудь готовый оператор, который мне не хватает, или существует простой способ реализовать что-то подобное в существующем API?

Ответы [ 3 ]

3 голосов
/ 26 января 2020

Существует ли простой способ реализовать что-то вроде этого

Зависит от вашей меры прямо. Вот как я бы это сделал.

Противодавление означает программную приостановку и возобновление в мире сопрограмм. Для onBackpressureDrop нисходящий поток должен указать, что он готов к одному элементу, и приостановить его, в то время как восходящий поток никогда не должен ждать готовности нисходящего потока.

Вы должны использовать восходящий поток неограниченным образом и передать элементы и события терминала нисходящему потоку в ожидании этих сигналов.

package hu.akarnokd.kotlin.flow.impl

import hu.akarnokd.kotlin.flow.Resumable
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.AbstractFlow
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.collect
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference

@FlowPreview
internal class FlowOnBackpressureDrop<T>(private val source: Flow<T>)
 : AbstractFlow<T>() {
    @ExperimentalCoroutinesApi
    @InternalCoroutinesApi
    override suspend fun collectSafely(collector: FlowCollector<T>) {
        coroutineScope {
            val consumerReady = AtomicBoolean()
            val producerReady = Resumable()
            val value = AtomicReference<T>()
            val done = AtomicBoolean()
            val error = AtomicReference<Throwable>();

            launch {
                try {
                    source.collect {
                        if (consumerReady.get()) {
                            value.set(it);
                            consumerReady.set(false);
                            producerReady.resume();
                        }
                    }
                    done.set(true)
                } catch (ex: Throwable) {
                    error.set(ex)
                }
                producerReady.resume()
            }

            while (true) {
                consumerReady.set(true)
                producerReady.await()

                val d = done.get()
                val ex = error.get()
                val v = value.getAndSet(null)

                if (ex != null) {
                    throw ex;
                }
                if (d) {
                    break;
                }

                collector.emit(v)
            }
        }
    }
}

Примечание: Возобновляемая реализация.

Итак, давайте рассмотрим реализацию.

Во-первых, необходимо 5 переменных для передачи информации между коллектором восходящего потока и коллектором, работающим для нижестоящего потока: - consumerReady указывает, что нисходящий поток готов к следующему элементу, - producerReady указывает, что производитель сохранил следующий элемент (или сигнал терминала) и нисходящий поток могут возобновиться - value готовый к употреблению элемент восходящего потока - done конец восходящего потока - error сбой восходящего потока

Далее мы должны запустить сборщик для восходящего потока, потому что сборка приостановлена ​​и не позволит нижестоящему потребителю oop запускать до завершения. В этом сборщике мы проверяем, готов ли нисходящий потребитель (через consumerReady) и, если это так, сохраняем текущий элемент, очищаем флаг готовности и сообщаем о его доступности через producerReady. Очистка consumerReady предотвратит сохранение последующих элементов восходящего потока до тех пор, пока сам нижний поток не покажет новую готовность.

Когда восходящий поток заканчивается или падает, мы устанавливаем переменные done или error и указываем производителя говорил.

После части launch { } мы будем продолжать использовать общие переменные от имени нижестоящего коллектора.

Первое, что нужно сделать в каждом раунде, это указать, что мы готовы к следующему значению, затем дождаться сигнала стороны производителя, что он поместил следующее событие в общую переменную (и).

Далее мы собираем значения из этих переменных. Мы стремимся к полному завершению или кидаем ошибку, и только в крайнем случае пересылаем вышестоящий элемент в нижестоящий коллектор.

2 голосов
/ 08 февраля 2020

Мы можем построить это, используя Поток, поддерживаемый Каналом Рандеву .

Когда емкость равна 0 - создается RendezvousChannel. Этот канал не имеет никакого буфера вообще. Элемент передается от отправителя к получателю только тогда, когда вызовы отправки и получения встречаются во времени (рандеву), поэтому отправка приостанавливается до тех пор, пока другая сопрограмма не вызовет получение и получение приостановит, пока другая сопрограмма не вызовет отправку.

Канал рандеву не имеет буфера Поэтому потребители этого канала должны быть приостановлены и ожидать следующего элемента, чтобы элемент был отправлен в этот канал. Мы можем использовать это качество для отбрасывания значений, которые не могут быть приняты без приостановки канала, используя Channel.offer, что является обычной функцией без приостановки.

Channel.offer

Добавляет элемент в эту очередь, если это можно сделать немедленно, без нарушения ограничений по емкости, и возвращает значение true. В противном случае, он немедленно возвращает false или выдает исключение, если канал isClosedForSend (подробности см. В разделе close).

Поскольку channelFlow буферизован, нам необходимо применить Flow<T>.buffer ниже по потоку к 0.

/**
 * Consume this [Flow] using a channelFlow with no buffer. Elements emitted from [this] flow
 * are offered to the underlying [channelFlow]. If the consumer is not currently suspended and 
 * waiting for the next element, the element is dropped. 
 * 
 * @return a flow that only emits elements when the downstream [Flow.collect] is waiting for the next element
 */
fun <T> Flow<T>.drop(): Flow<T> = channelFlow {
    collect { offer(it) }
}.buffer(capacity = 0)

Вот пример того, как медленный потребитель может использовать это, чтобы отбрасывать элементы.

fun main() = runBlocking {
    flow {
        (0..100).forEach {
            emit(it)
            delay(100)
        }
    }.drop().collect {
        delay(1000)
        println(it)
    }
}

с соответствующим выводом:

0
11
21
31
41
51
61
71
81
91
1 голос
/ 08 февраля 2020

Из комментария здесь , сделанного Антоном Спансом, есть способ эмулировать падение с помощью channelFlow.
Но проблема в том, что по умолчанию channelFlow строитель использует стратегию BUFFER и не не позволяют параметризовать емкость.
Существует способ параметризации емкости в ChannelFlowBuilder, но проблема заключается в том, что API является внутренним, а ChannelFlowBuilder является частным.
Но, по сути, при реализации копирования-вставки ChannelFlowBuilder и создайте класс следующим образом:

class BackPressureDropFlow<T>(private val source: Flow<T>) : AbstractFlow<T>() {

    @InternalCoroutinesApi
    override suspend fun collectSafely(collector: FlowCollector<T>) {
        ChannelFlowBuilder<T>({ source.collect { offer(it) } }, capacity = 0)
            .collect { collector.emit(it) }
    }
}

(или напрямую примените аналогичное решение как преобразование).
Тогда это, похоже, сработает.
Основной ключ здесь - это использование capacity = 0, которое говорит, что в нисходящем направлении будет приостановлено для каждого полученного элемента (так как буферная емкость отсутствует).

...