Я хочу запустить N вложенных потоков / каналов параллельно и отправить каждый элемент только одному из вложенных потоков. Баланс позволяет мне сделать это, но я хочу направить элементы с тем же «ключом» к тому же вложенному потоку или каналу.
Я не вижу никаких функций для этого, поэтому я написал базовый POC, который передает каждый элемент каждому потоку. Затем поток / канал фильтрует только те элементы, с которыми он должен работать (см. Ниже). Это кажется совершенно неэффективным, есть ли лучший способ направить элементы к конкретным вложенным потокам?
package io.xxx.streams
import cats.effect.{ExitCode, IO, IOApp}
import fs2.{Pipe, Stream}
object StreamsApp extends IOApp {
import cats.syntax.functor._
import scala.concurrent.duration._
case class StreamMessage(routingKey: Int, value: String)
// filter elements which belong to the given bin
def filterAndLog(bin: Int, numBins: Int): IO[Pipe[IO, StreamMessage, Unit]] = IO {
val predicate = (m: StreamMessage) => m.routingKey % numBins == bin
in: Stream[IO, StreamMessage] => {
in.filter(predicate).evalMap(m => IO {
println(s"bin $bin - ${m.value}")
})
}
}
override def run(args: List[String]): IO[ExitCode] = {
val effectsStream = for {
pipeOne <- Stream.eval(filterAndLog(0, 2))
pipeTwo <- Stream.eval(filterAndLog(1, 2))
s <- Stream
.fixedDelay[IO](100.millis)
.zipRight(Stream.range(0, 50))
.map(i => StreamMessage(i, s"message $i"))
.broadcastThrough(pipeOne, pipeTwo)
} yield s
effectsStream.compile.drain.as(ExitCode(0))
}
}
Сообщения с одним и тем же ключом маршрутизации должны обрабатываться одним и тем же потоком / каналом