Kotlin Coroutines - один производитель, несколько потребителей с выборочным потреблением сообщений - PullRequest
0 голосов
/ 28 мая 2020

Я исследую модель Kotlin производитель-потребитель, используя совместные подпрограммы Kotlin. В моем случае использования у меня есть один производитель, который производит IP-пакеты, и я пересылаю их, используя DatagramChannel или SocketChannel, в зависимости от того, был ли это сегмент UDP или TCP соответственно. Чтобы визуально выразиться, производитель дает мне (IP Header|Transport Header|Payload), который представлен классом пользовательских данных IPDatagram в приведенном ниже коде.

fun start() {
    forward(ipPkts())
}

fun ipPkts() = scope.produce<Pair<DatagramType, IPDatagram>> {
    while(isActive) {
        val pkt = readVpnIf0()
        send(Pair(pkt.id, pkt))
    }
}

fun forward(channel: ReceiveChannel<Pair<DatagramType, IPDatagram>>) = scope.launch {
    channel.consumeEach {
        val (type, pkt) = it
        when(type.transportProtocol) {
            UDP -> udpWrite(pkt)
            TCP -> tcpWrite(pkt)
        }
    }
}

Этот код работает, но я использую один kotlinx.coroutines.channels.Channel для функций записи TCP и UDP, которые независимы сами по себе. Я сослался на подход fan-out, представленный в Kotlin Language Guide , но он не совсем подходит для моего варианта использования для создания нескольких потребителей, поскольку сегменты UDP и TCP будут чередоваться.

Вопрос : Есть ли способ иметь несколько потребителей и выборочно принимать сообщения от одного Channel, при условии, что канал имеет capacity. Другими словами, потребитель TCP перейдет к обработке 2-го пакета, не дожидаясь 1-го, если это сегмент UDP, и наоборот. Пример пакетов в очереди: UDP | TCP | TCP | UDP.

В настоящее время я использовал два отдельных Channel для TCP и UDP, но мне кажется, что это немного уродливое решение. С нетерпением жду возможности узнать что-то новое :)

...