Противодавление акка-потоков в эфире с асинхронной обработкой - PullRequest
0 голосов
/ 29 декабря 2018

Мне трудно понять, применяет ли akka-stream принудительное противодействие на источнике, когда широковещательная передача с одной ветвью занимает много времени (асинхронно) в графе.

Я пробовал buffer и batchчтобы увидеть, было ли какое-либо противодавление, примененное к источнику, но оно не похоже на это.Я также попытался сбросить System.out, но это ничего не меняет.

object Test extends App {
/* Necessary for akka stream */
implicit val system = ActorSystem("test")
implicit val materializer: ActorMaterializer = ActorMaterializer()

val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
    import GraphDSL.Implicits._

    val in = Source.tick(0 seconds, 1 seconds, 1)
        in.runForeach(i => println("Produced " + i))

    val out = Sink.foreach(println)
    val out2 = Sink.foreach[Int]{ o => println(s"2 $o") }

    val bcast = builder.add(Broadcast[Int](2))

    val batchedIn: Source[Int, Cancellable] = in.batch(4, identity) {
        case (s, v) => println(s"Batched ${s+v}"); s + v
    }

    val f2 = Flow[Int].map(_ + 10)
    val f4 = Flow[Int].map { i => Thread.sleep(2000); i}

    batchedIn ~> bcast ~> f2 ~> out
                 bcast ~> f4.async ~> out2
    ClosedShape
})

g.run()
}

Я ожидал бы увидеть "Batched ..." в консоли, когда я запускаю программу и в какой-то момент ее получаюна мгновение застрял, потому что F4 не достаточно быстро, чтобы обработать значения.На данный момент ни один из них не ведет себя так, как ожидалось, так как числа генерируются непрерывно, и пакет не выполняется.

РЕДАКТИРОВАТЬ: Я заметил, что через некоторое время пакетные сообщения начинают распечатыватьсяв консоли.Я до сих пор не знаю, почему это не происходит раньше, поскольку обратное давление должно произойти для первых элементов

1 Ответ

0 голосов
/ 29 декабря 2018

Причиной, объясняющей это поведение, являются внутренние буферы, которые вводятся akka при установке асинхронных границ.

Буферы для асинхронных операторов

внутренние буферыкоторые вводятся в качестве оптимизации при использовании асинхронных операторов.


Несмотря на то, что конвейерная обработка в целом увеличивает пропускную способность, на практике существует стоимость передачи элемента через асинхронный (и, следовательно, потокпересечение) граница, которая является существенной.Для амортизации этой стоимости Akka Streams использует внутреннюю стратегию пакетного противодавления.Это окно, потому что, в отличие от протокола Stop-And-Wait, несколько элементов могут быть «в полете» одновременно с запросами элементов.Это также пакетная обработка, поскольку новый элемент не запрашивается сразу после того, как элемент был извлечен из буфера окна, но запрашивается несколько элементов после того, как несколько элементов было очищено .Эта стратегия пакетирования снижает стоимость передачи сигнала обратного давления через асинхронную границу.

Я понимаю, что это игрушечный поток, но если вы объясните, в чем ваша цель, я постараюсь вам помочь.

Вам нужно mapAsync вместо async

val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
  import akka.stream.scaladsl.GraphDSL.Implicits._

  val in = Source.tick(0 seconds, 1 seconds, 1).map(x => {println(s"Produced ${x}"); x})

  val out = Sink.foreach[Int]{ o => println(s"F2 processed $o") }
  val out2 = Sink.foreach[Int]{ o => println(s"F4 processed $o") }

  val bcast = builder.add(Broadcast[Int](2))

  val batchedIn: Source[Int, Cancellable] = in.buffer(4,OverflowStrategy.backpressure)

  val f2 = Flow[Int].map(_ + 10)
  val f4 = Flow[Int].mapAsync(1) { i => Future { println("F4 Started Processing"); Thread.sleep(2000); i }(system.dispatcher) }

  batchedIn ~> bcast ~> f2 ~> out
  bcast ~> f4 ~> out2
  ClosedShape
}).run()
...