Akka Stream Parallelism - PullRequest
       117

Akka Stream Parallelism

0 голосов
/ 30 мая 2020

Я пытался распараллелить поток в Akka Stream согласно документации [1], но по какой-то причине не получил ожидаемого результата.

Я выполнил шаги, описанные в документации, и не думаю, что что-то пропустил. Тем не менее, все вычисления моего потока происходят последовательно одно за другим.

Что мне здесь не хватает?

[1] https://doc.akka.io/docs/akka/current/stream/stream-parallelism.html

import akka.actor.ActorSystem
import akka.stream.FlowShape
import akka.stream.scaladsl.{Balance, Flow, GraphDSL, Merge, Source}

object ScalaParallell extends App {

  implicit val system = ActorSystem("QuickStart")

  def longRunningComputation(x: Int): Int = {
    println(s"Computing 1 ${x}")
    Thread.sleep(10000)
    println(s"Computation 1 ${x} done")
    x
  }
  def longRunningComputation2(x: Int): Int = {
    println(s"Computing 2 ${x}")
    Thread.sleep(10000)
    println(s"Computation 2 ${x} done")
    x
  }

  val processor: Flow[Int, Int, NotUsed] =
    Flow.fromGraph(GraphDSL.create() { implicit b =>
      import GraphDSL.Implicits._

      // prepare graph elements
      val balance = b.add(Balance[Int](2))
      val merge = b.add(Merge[Int](2))
      val f = Flow[Int].map(longRunningComputation)
      val f2 = Flow[Int].map(longRunningComputation2)


      // connect the graph
      balance.out(0) ~> f.async ~> merge.in(0)
      balance.out(1) ~> f2.async ~> merge.in(1)

      // expose ports
      FlowShape(balance.in, merge.out)
    })


  // Wire it all up.
  val xs = List(1,2,3)
  val source: Source[Int, NotUsed] = Source(xs)
  source.via(processor).runForeach(println)


  Thread.sleep(5000)
}

Пример вывода

Computing 2 1
Computation 2 1 done
Computing 2 2
1
Computation 2 2 done
Computing 2 3
2
Computation 2 3 done
3

Я ожидал, что два вычисления будут выполняться одновременно. Например:

Computing 1 1
Computing 1 2
Computation 1 2 done
Computing 1 3
Computation 1 1 done
Computing 2 4
1
2
..

1 Ответ

1 голос
/ 30 мая 2020

Попробуйте удалить Thread.sleep внутри longRunningComputation и longRunningComputation2 и установить xs на что-то гораздо большее, например 1 to 100, тогда вы сможете наблюдать за параллельной обработкой. Не уверен, почему, но блокировка Thread.sleep определенно считается анти-паттерном в akka

...