Akka Stream: В чем разница между Unzip и Broadcast? - PullRequest
1 голос
/ 21 марта 2019

Я пытаюсь добиться чего-то вроде этого:

graph

Я пытаюсь создать этот поток, используя Flow.fromGraph

  • Я могу сделать join, используя Zip[B, C], который принимает 2 потока
  • Я могу сделать split двумя способами:
    • , используя Broadcast[A](2)
    • используя UnZip[(A,A)], которому предшествует .map(a -> (a, a))

И map(f1), и map(f2) - это пользовательские потоки, которые я получаю из включенных библиотек, поэтому я могу 'действительно не изменяйте их, поэтому, пожалуйста, не говорите .map(a => (f1(a), f2(a)))

В чем различия между этими двумя случаями, или они эквивалентны? Единственное, что я нашел отличительным, была способность Broadcast отменить только тогда, когда все нисходящие потоки отменяют (eagerCancel = false), что является его поведением по умолчанию, в отличие от UnZip (который делает то, что широковещательно делает с eagerCancel = true)

Кроме того, что происходит в случае сбоевв любом из 2 путей?то есть каково влияние, если для определенного элемента f1 выдает ошибку?

Кроме того, скажем, если у нас нет функции f2 (поэтому нет операции с картой), и мы хотим выдать (b,a) в конце, следует ли заменить f2 потоком идентификаторов, или он может быть пропущен все вместе?(если последнее, вы бы когда-либо использовали поток идентификации?)

val split = builder.add(BroadCast[A](2))
val join = builder.add(Zip[B, A])
val F1: Flow[A, B, NotUsed] = Flow[A].map(f1)
val I = Flow[A].map(identity)

split ~> F1 ~> join.in1
split ~> /* I ~> */ join.in0 // do i need the commented part?

Возможно, это поможет с внутренними буферами / обратным давлением?

...