Объедините несколько источников с одинаковыми материализованными значениями - PullRequest
1 голос
/ 24 марта 2020

Оператор combine в потоках akka имеет следующую подпись:

  def combine[T, U](first: Source[T, _], second: Source[T, _], rest: Source[T, _]*)(
      strategy: Int => Graph[UniformFanInShape[T, U], NotUsed]): Source[U, NotUsed]

У меня несколько источников, все с одинаковым Mat. Мне нужно объединить их, сохраняя при этом Mat.

. Поэтому мне нужна функция со следующей сигнатурой:

  def combine[T, U](first: Source[T, Mat], second: Source[T, Mat], rest: Source[T, Mat]*)(
      strategy: Int => Graph[UniformFanInShape[T, U], NotUsed]): Source[U, Seq[Mat]]

Существующий combineMat принимает только два входа. Мне нужно неограниченное количество.

Реализация объединения Akka:

  def combine[T, U](first: Source[T, _], second: Source[T, _], rest: Source[T, _]*)(
      strategy: Int => Graph[UniformFanInShape[T, U], NotUsed]): Source[U, NotUsed] =
    Source.fromGraph(GraphDSL.create() { implicit b =>
      import GraphDSL.Implicits._
      val c = b.add(strategy(rest.size + 2))
      first ~> c.in(0)
      second ~> c.in(1)

      @tailrec def combineRest(idx: Int, i: Iterator[Source[T, _]]): SourceShape[U] =
        if (i.hasNext) {
          i.next() ~> c.in(idx)
          combineRest(idx + 1, i)
        } else SourceShape(c.out)

      combineRest(2, rest.iterator)
    })

Используется SourceShape, который не поддерживает Mat s, поэтому я не думаю, что здесь будет работать.

Между тем реализация combineMat использует viaMat, который не будет работать для нескольких потоков.

Возможно ли это?

1 Ответ

0 голосов
/ 29 марта 2020

работают следующие работы:

import akka.stream.scaladsl.GraphDSL.Implicits._
import akka.stream.scaladsl.{GraphDSL, Source}
import akka.stream.{Graph, SourceShape, UniformFanInShape}

import scala.collection.immutable

object Combine {
  def combine[T, U, Mat](sources: immutable.Seq[Source[T, Mat]])(strategy: Int => Graph[UniformFanInShape[T, U], Mat]): Source[U, immutable.Seq[Mat]] = {
    Source.fromGraph(GraphDSL.create(sources) {
      implicit builder => {
        sourceShapes => {
          val target = builder.add(strategy(sources.size))

          for ((source, index) <- sourceShapes.zipWithIndex) {
            source ~> target.in(index)
          }

          SourceShape(target.out)
        }
      }
    })
  }
}
...