Я пытаюсь добиться чего-то вроде этого:
Я пытаюсь создать этот поток, используя Flow.fromGraph
- Я могу сделать
join
, используя Zip[B, C]
, который принимает 2 потока - Я могу сделать
split
двумя способами: - , используя
Broadcast[A](2)
- используя
UnZip[(A,A)]
, которому предшествует .map(a -> (a, a))
И map(f1)
, и map(f2)
- это пользовательские потоки, которые я получаю из включенных библиотек, поэтому я могу 'действительно не изменяйте их, поэтому, пожалуйста, не говорите .map(a => (f1(a), f2(a)))
В чем различия между этими двумя случаями, или они эквивалентны? Единственное, что я нашел отличительным, была способность Broadcast
отменить только тогда, когда все нисходящие потоки отменяют (eagerCancel = false
), что является его поведением по умолчанию, в отличие от UnZip
(который делает то, что широковещательно делает с eagerCancel = true
)
Кроме того, что происходит в случае сбоевв любом из 2 путей?то есть каково влияние, если для определенного элемента f1 выдает ошибку?
Кроме того, скажем, если у нас нет функции f2
(поэтому нет операции с картой), и мы хотим выдать (b,a)
в конце, следует ли заменить f2 потоком идентификаторов, или он может быть пропущен все вместе?(если последнее, вы бы когда-либо использовали поток идентификации?)
val split = builder.add(BroadCast[A](2))
val join = builder.add(Zip[B, A])
val F1: Flow[A, B, NotUsed] = Flow[A].map(f1)
val I = Flow[A].map(identity)
split ~> F1 ~> join.in1
split ~> /* I ~> */ join.in0 // do i need the commented part?
Возможно, это поможет с внутренними буферами / обратным давлением?