Я исследую модель Kotlin производитель-потребитель, используя совместные подпрограммы Kotlin. В моем случае использования у меня есть один производитель, который производит IP-пакеты, и я пересылаю их, используя DatagramChannel
или SocketChannel
, в зависимости от того, был ли это сегмент UDP
или TCP
соответственно. Чтобы визуально выразиться, производитель дает мне (IP Header|Transport Header|Payload)
, который представлен классом пользовательских данных IPDatagram
в приведенном ниже коде.
fun start() {
forward(ipPkts())
}
fun ipPkts() = scope.produce<Pair<DatagramType, IPDatagram>> {
while(isActive) {
val pkt = readVpnIf0()
send(Pair(pkt.id, pkt))
}
}
fun forward(channel: ReceiveChannel<Pair<DatagramType, IPDatagram>>) = scope.launch {
channel.consumeEach {
val (type, pkt) = it
when(type.transportProtocol) {
UDP -> udpWrite(pkt)
TCP -> tcpWrite(pkt)
}
}
}
Этот код работает, но я использую один kotlinx.coroutines.channels.Channel
для функций записи TCP и UDP, которые независимы сами по себе. Я сослался на подход fan-out
, представленный в Kotlin Language Guide , но он не совсем подходит для моего варианта использования для создания нескольких потребителей, поскольку сегменты UDP и TCP будут чередоваться.
Вопрос : Есть ли способ иметь несколько потребителей и выборочно принимать сообщения от одного Channel
, при условии, что канал имеет capacity
. Другими словами, потребитель TCP перейдет к обработке 2-го пакета, не дожидаясь 1-го, если это сегмент UDP, и наоборот. Пример пакетов в очереди: UDP | TCP | TCP | UDP
.
В настоящее время я использовал два отдельных Channel
для TCP и UDP, но мне кажется, что это немного уродливое решение. С нетерпением жду возможности узнать что-то новое :)