Zip два потока в потоках Акка - PullRequest
0 голосов
/ 03 июля 2019

у меня два потока:

val a: Flow[Input, Data, NotUsed] =...
val b: Flow[Input, Unit, NotUsed] =...

Первый поток - это поток данных, который мне небезразличен, второй - поток «сигналов», то есть я действительно хочу отправлять Data нисходящий поток только тогда, когда элемент получен в b.

Я думал об использовании чего-то вроде a.zipWith(b)((fromA, fromB) => fromA), но, похоже, это работает только между потоком и источником (несмотря на то, что документация Akka подразумевает, что он также поддерживает сжатые потоки).

Чего мне не хватает?

Спасибо

Ответы [ 2 ]

1 голос
/ 04 июля 2019

Если вы посмотрите на подписи zip и zipWith:

def zip[U](that: Graph[SourceShape[U], _]): Repr[(Out, U)]

def zipWith[Out2, Out3](that: Graph[SourceShape[Out2], _])(combine: (Out, Out2) => Out3): Repr[Out3]

оба метода ожидают Source.

Архивирование Flow с другим Flow не будет таким тривиальным, как можно подумать (например, 2-й Flow может производить несколько элементов на элемент ввода с mapConcat).

Можно рассмотреть возможность создания пользовательского GraphStage, как показано в следующем тривиализированном примере:

case class DataIn(id: Int)
case class DataOut(content: String)
case class Signal(s: Int)

class ZipperFlow extends GraphStage[FlowShape[(DataIn, Signal), DataOut]] {

  val in = Inlet[(DataIn, Signal)]("ZipperFlow.in")
  val out = Outlet[DataOut]("ZipperFlow.out")

  override val shape = FlowShape.of(in, out)

  override def createLogic(attr: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) {
      setHandler(in, new InHandler {
        override def onPush(): Unit = {
          push(out, DataOut("content-" + grab(in)._1.id))
        }
      })
      setHandler(out, new OutHandler {
        override def onPull(): Unit = {
          pull(in)
        }
      })
    }
}

Тестирование ZipperFlow:

implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher

val dataSource = Source(1 to 5).map(DataIn(_))
val signalSource = Source(1 to 5).map(Signal(_))

val sink: Sink[DataOut, Future[Done]] = Sink.foreach(println)

dataSource.zip(signalSource).via(new ZipperFlow).runWith(sink)

// DataOut(content-1)
// DataOut(content-2)
// DataOut(content-3)
// DataOut(content-4)
// DataOut(content-5)
0 голосов
/ 04 июля 2019

это может быть достигнуто с помощью merge в графиках akka-streams

обновление: правильный zip

пример:

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.scaladsl._
import akka.stream.{ActorMaterializer, ClosedShape}


object Application extends App {

  implicit val sys: ActorSystem = ActorSystem()
  implicit val mat: ActorMaterializer = ActorMaterializer()

  val flowX: Flow[Int, String, NotUsed] = Flow[Int].map(i => (i + 10).toString)
  val flowY: Flow[Int, Long, NotUsed] = Flow[Int].map(i => (i * 2).toLong)

  RunnableGraph.fromGraph(GraphDSL.create(flowX, flowY)((_, _)) { implicit builder =>
    (flowX, flowY) =>
      import GraphDSL.Implicits._
      val broadcast = builder.add(Broadcast[Int](2))
      val zip = builder.add(Zip[String, Long])
      Source((1 to 10).toList) ~> broadcast.in

      broadcast ~> flowX ~> zip.in0
      broadcast ~> flowY ~> zip.in1
      zip.out ~> Sink.foreach(println)
      ClosedShape
  }).run()
}

flowX & flowY - параметры для построения графика. в части constructing graph вы можете найти различные случаи разделения и объединения потока (разветвление + разнесение). работать с графиками немного сложнее, чем с линейным потоком. может быть, имеет смысл просто создать частичный граф с формой потока (1 вход, 1 выход) - так что пользователь увидит его как обычный поток (но со скрытой сложностью). лично я бы посоветовал не использовать графики вообще, потому что это сложнее тестировать (там труднее найти ошибку или снижение производительности), хотя в некоторых случаях это отличная возможность

Вы можете найти множество методов для создания графиков с различным количеством параметров. кроме того, вы можете предоставить различные входные параметры для создания графика - разные источники, потоки, приемники.

...