Как вы справляетесь с фьючерсами и mapAsync в Akka Flow? - PullRequest
0 голосов
/ 13 декабря 2018

Я построил akka graph DSL, определяющий простой поток.Но поток f4 отправляет элемент за 3 секунды, а f2 - за 10 секунд.

В результате я получил: 3, 2, 3, 2. Но это не то, что я хочу.Поскольку f2 занимает слишком много времени, я хотел бы получить: 3, 3, 2, 2. Вот код ...

implicit val actorSystem = ActorSystem("NumberSystem")
implicit val materializer = ActorMaterializer()

val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
  import GraphDSL.Implicits._
  val in = Source(List(1, 1))
  val out = Sink.foreach(println)

  val bcast = builder.add(Broadcast[Int](2))
  val merge = builder.add(Merge[Int](2))



  val yourMapper: Int => Future[Int] = (i: Int) => Future(i + 1)
  val yourMapper2: Int => Future[Int] = (i: Int) => Future(i + 2)

  val f1, f3 = Flow[Int]
  val f2= Flow[Int].throttle(1, 10.second, 0, ThrottleMode.Shaping).mapAsync[Int](2)(yourMapper)
  val f4= Flow[Int].throttle(1, 3.second, 0, ThrottleMode.Shaping).mapAsync[Int](2)(yourMapper2)

  in ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> out
  bcast ~> f4 ~> merge
  ClosedShape
})
g.run()

Так, где я иду не так?С будущим или mapAsync?или еще ... Спасибо

Ответы [ 3 ]

0 голосов
/ 14 декабря 2018

Извините, я новичок в Акке, поэтому я все еще учусь.Чтобы получить ожидаемые результаты, одним из способов является установка асинхронности:

val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
  import GraphDSL.Implicits._
  val in = Source(List(1, 1))
  val out = Sink.foreach(println)

  val bcast = builder.add(Broadcast[Int](2))
  val merge = builder.add(Merge[Int](2))



  val yourMapper: Int => Future[Int] = (i: Int) => Future(i + 1)
  val yourMapper2: Int => Future[Int] = (i: Int) => Future(i + 2)

  val f1, f3 = Flow[Int]
  val f2= Flow[Int].throttle(1, 10.second, 0, ThrottleMode.Shaping).map(_+1)
    //.mapAsyncUnordered[Int](2)(yourMapper)
  val f4= Flow[Int].throttle(1, 3.second, 0, ThrottleMode.Shaping).map(_+2)
    //.mapAsync[Int](2)(yourMapper2)

  in ~> f1 ~> bcast ~> f2.async ~> merge ~> f3 ~> out
  bcast ~> f4.async ~> merge
  ClosedShape
})
g.run()
0 голосов
/ 14 декабря 2018

Как вы уже поняли, замена:

mapAsync(i => Future{i + delta})

на:

map(_ + delta).async

в двух потоках приведет к тому, что вы хотите.

другой результат сводится к ключевой разнице между mapAsync и map + async.В то время как mapAsync разрешает выполнение Futures в параллельных потоках, множественные этапы потока mapAsync все еще управляются одним и тем же базовым актором, который будет выполнять оператор слияния перед выполнением (для экономической эффективности в целом).

С другой стороны, async фактически вводит асинхронную границу в поток потока с отдельными этапами потока, обрабатываемыми отдельными участниками.В вашем случае каждая из двух ступеней потока независимо испускает элементы ниже по потоку, и какой элемент, испущенный первым, расходуется первым.Неизбежно стоит платить за управление потоком через асинхронную границу, и Akka Stream использует стратегию оконной буферизации для амортизации стоимости (см. Этот Akka Stream doc ).

Для получения более подробной информации:Разница между mapAsync и async, эта запись в блоге может представлять интерес.

0 голосов
/ 14 декабря 2018

Итак, вы пытаетесь объединить результаты, полученные из f2 и f4.В этом случае вы пытаетесь сделать то, что иногда называют «паттерном разброса».

Я не думаю, что есть готовые способы его реализации, без добавления пользовательского этапа с отслеживанием состояния, который будет отслеживать выходы из f2 и из f4 и выдавать запись, когда оба доступны.Но есть некоторые сложности, о которых следует помнить:

  • Что случится, если f2 / f4 потерпит неудачу
  • Что произойдет, если они займут слишком много времени
  • Вам необходимо иметьуникальный ключ для каждой входной записи, так что вы знаете, какой выход из f2 соответствует f4 (или наоборот)
...