Я построил akka graph DSL, определяющий простой поток.Но поток f4 отправляет элемент за 3 секунды, а f2 - за 10 секунд.
В результате я получил: 3, 2, 3, 2. Но это не то, что я хочу.Поскольку f2 занимает слишком много времени, я хотел бы получить: 3, 3, 2, 2. Вот код ...
implicit val actorSystem = ActorSystem("NumberSystem")
implicit val materializer = ActorMaterializer()
val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val in = Source(List(1, 1))
val out = Sink.foreach(println)
val bcast = builder.add(Broadcast[Int](2))
val merge = builder.add(Merge[Int](2))
val yourMapper: Int => Future[Int] = (i: Int) => Future(i + 1)
val yourMapper2: Int => Future[Int] = (i: Int) => Future(i + 2)
val f1, f3 = Flow[Int]
val f2= Flow[Int].throttle(1, 10.second, 0, ThrottleMode.Shaping).mapAsync[Int](2)(yourMapper)
val f4= Flow[Int].throttle(1, 3.second, 0, ThrottleMode.Shaping).mapAsync[Int](2)(yourMapper2)
in ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> out
bcast ~> f4 ~> merge
ClosedShape
})
g.run()
Так, где я иду не так?С будущим или mapAsync?или еще ... Спасибо