Оператор combine
в потоках akka имеет следующую подпись:
def combine[T, U](first: Source[T, _], second: Source[T, _], rest: Source[T, _]*)(
strategy: Int => Graph[UniformFanInShape[T, U], NotUsed]): Source[U, NotUsed]
У меня несколько источников, все с одинаковым Mat
. Мне нужно объединить их, сохраняя при этом Mat
.
. Поэтому мне нужна функция со следующей сигнатурой:
def combine[T, U](first: Source[T, Mat], second: Source[T, Mat], rest: Source[T, Mat]*)(
strategy: Int => Graph[UniformFanInShape[T, U], NotUsed]): Source[U, Seq[Mat]]
Существующий combineMat
принимает только два входа. Мне нужно неограниченное количество.
Реализация объединения Akka:
def combine[T, U](first: Source[T, _], second: Source[T, _], rest: Source[T, _]*)(
strategy: Int => Graph[UniformFanInShape[T, U], NotUsed]): Source[U, NotUsed] =
Source.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val c = b.add(strategy(rest.size + 2))
first ~> c.in(0)
second ~> c.in(1)
@tailrec def combineRest(idx: Int, i: Iterator[Source[T, _]]): SourceShape[U] =
if (i.hasNext) {
i.next() ~> c.in(idx)
combineRest(idx + 1, i)
} else SourceShape(c.out)
combineRest(2, rest.iterator)
})
Используется SourceShape
, который не поддерживает Mat
s, поэтому я не думаю, что здесь будет работать.
Между тем реализация combineMat
использует viaMat
, который не будет работать для нескольких потоков.
Возможно ли это?