Kotlin сопрограмм на молнии три потока - PullRequest
1 голос
/ 30 марта 2020

Имеется функция zip для сжатия двух Flows. Есть ли что-то для zip three (или более) Flows вместе?

Если нет, можете ли вы помочь мне реализовать для него функцию расширения? Что-то вроде:

flow.zip(flow2, flow3) { a, b, c -> 

}

Ответы [ 2 ]

1 голос
/ 30 марта 2020

Вы можете проверить реализацию оператора zip и попытаться скопировать / эмулировать , как это работает, адаптируя его к вашим потребностям.

Протестируйте его и внесите все необходимые изменения. нужно

fun <T1, T2, T3, R> Flow<T1>.zip(flow2: Flow<T2>, flow3: Flow<T3>, transform: suspend (T1, T2, T3) -> R): Flow<R> = channelFlow {

    val first: ReceiveChannel<T1> = produce {
        this@zip.collect {
            channel.send(it)
        }
    }

    val second: ReceiveChannel<T2> = produce {
        flow2.collect {
            channel.send(it)
        }
    }

    val third: ReceiveChannel<T3> = produce {
        flow3.collect {
            channel.send(it)
        }
    }

    (second as SendChannel<*>).invokeOnClose {
        if (!first.isClosedForReceive) first.cancel(MyFlowException())
        if (!third.isClosedForReceive) third.cancel(MyFlowException())
    }

    (third as SendChannel<*>).invokeOnClose {
        if (!first.isClosedForReceive) first.cancel(MyFlowException())
        if (!second.isClosedForReceive) second.cancel(MyFlowException())
    }

    val otherIterator = second.iterator()
    val anotherIterator = third.iterator()

    try {
        first.consumeEach { value ->
            if (!otherIterator.hasNext() || !anotherIterator.hasNext()) {
                return@consumeEach
            }
            send(transform(value, otherIterator.next(), anotherIterator.next()))
        }
    } catch (e: MyFlowException) {
        // complete
    } finally {
        if (!second.isClosedForReceive) second.cancel(MyFlowException())
        if (!third.isClosedForReceive) third.cancel(MyFlowException())
    }
}

class MyFlowException: CancellationException()

Использование:

flow1.zip(flow2, flow3) { a, b, c ->
    //Do your work
}...
0 голосов
/ 30 марта 2020

Я не проверял это, но вы можете попробовать. Существует много базового кода для zip, поэтому, чтобы использовать это, я заархивирую первые два потока в поток пар, а затем перенесу поток пар в третий поток. Но лямбда, переданная этой функции, получает первые две уже разделенных, поэтому ей не нужно знать о промежуточном шаге Pair.

fun <T1, T2, T3, R> zip(
    first: Flow<T1>,
    second: Flow<T2>,
    third: Flow<T3>,
    transform: suspend (T1, T2, T3) -> R
): Flow<R> =
    first.zip(second) { a, b -> a to b }
        .zip(third) { (a, b), c ->
            transform(a, b, c)
        }

Использование так:

zip(flow1, flow2, flow3) { a, b, c ->
    Triple(a, b, c)
}

И Вот непроверенная версия для произвольного числа потоков, но они должны быть одного типа:

fun <T, R> zip(
    vararg flows: Flow<T>,
    transform: suspend (List<T>) -> R
): Flow<R> = when(flows.size) {
    0 -> error("No flows")
    1 -> flows[0].map{ transform(listOf(it)) }
    2 -> flows[0].zip(flows[1]) { a, b -> transform(listOf(a, b)) }
    else -> {
        var accFlow: Flow<List<T>> = flows[0].zip(flows[1]) { a, b -> listOf(a, b) }
        for (i in 2 until flows.size) {
            accFlow = accFlow.zip(flows[i]) { list, it ->
                list + it
            }
        }
        accFlow.map(transform)
    }
}
...