Мне трудно понять, применяет ли akka-stream принудительное противодействие на источнике, когда широковещательная передача с одной ветвью занимает много времени (асинхронно) в графе.
Я пробовал buffer
и batch
чтобы увидеть, было ли какое-либо противодавление, примененное к источнику, но оно не похоже на это.Я также попытался сбросить System.out
, но это ничего не меняет.
object Test extends App {
/* Necessary for akka stream */
implicit val system = ActorSystem("test")
implicit val materializer: ActorMaterializer = ActorMaterializer()
val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val in = Source.tick(0 seconds, 1 seconds, 1)
in.runForeach(i => println("Produced " + i))
val out = Sink.foreach(println)
val out2 = Sink.foreach[Int]{ o => println(s"2 $o") }
val bcast = builder.add(Broadcast[Int](2))
val batchedIn: Source[Int, Cancellable] = in.batch(4, identity) {
case (s, v) => println(s"Batched ${s+v}"); s + v
}
val f2 = Flow[Int].map(_ + 10)
val f4 = Flow[Int].map { i => Thread.sleep(2000); i}
batchedIn ~> bcast ~> f2 ~> out
bcast ~> f4.async ~> out2
ClosedShape
})
g.run()
}
Я ожидал бы увидеть "Batched ..." в консоли, когда я запускаю программу и в какой-то момент ее получаюна мгновение застрял, потому что F4 не достаточно быстро, чтобы обработать значения.На данный момент ни один из них не ведет себя так, как ожидалось, так как числа генерируются непрерывно, и пакет не выполняется.
РЕДАКТИРОВАТЬ: Я заметил, что через некоторое время пакетные сообщения начинают распечатыватьсяв консоли.Я до сих пор не знаю, почему это не происходит раньше, поскольку обратное давление должно произойти для первых элементов