Я не уверен, что это именно то, что вы просили, но у меня есть какое-то решение. Что можно сделать, так это создать все потоки, например, мы можем посмотреть:
val flows = Seq (
Flow.fromFunction[Int, Int](x => { println(s"flow1: Received $x"); x * 2 }),
Flow.fromFunction[Int, Int](x => { println(s"flow2: Received $x"); x + 1}),
Flow.fromFunction[Int, Int](x => { println(s"flow3: Received $x"); x * x})
)
Затем нам нужно добавить к каждому из существующих потоков обработку ошибок. Итак, давайте определим его и добавим к каждому из элементов:
val errorHandling = Flow[Int].filter(_ % 2 == 0)
val errorsHandledFlows = flows.map(flow => flow.via(errorHandling))
Теперь нам нужна вспомогательная функция, которая соединит все наши новые потоки:
def connectFlows(errorsHandledFlows: Seq[Flow[Int, Int, _]]): Flow[Int, Int, _] = {
errorsHandledFlows match {
case Seq() => Flow[Int] // This line is actually redundant, But I don't want to have an unexhausted pattern matching
case Seq(singleFlow) => singleFlow
case head :: tail => head.via(connectFlows(tail))
}
}
И теперь нам нужно выполнить все вместе, например:
Source(1 to 4).via(connectFlows(errorsHandledFlows)).to(Sink.foreach(println)).run()
Будет выдан результат:
flow1: Received 1
flow2: Received 2
flow1: Received 2
flow2: Received 4
flow1: Received 3
flow2: Received 6
flow1: Received 4
flow2: Received 8
Как вы можете заметить, мы фильтруем нечетные числа. Следовательно, первый поток получает все числа от 1 до 4. Второй поток получил 2,4,6,8 (первый поток умножил значения на 2), а последний не получил никакого потока, потому что второй поток делает все значений нечетное.