Возможно, вы захотите взглянуть на эту документацию Akka: построение dynamic pub-sub service
с использованием MergeHub
и BroadcastHub
.
Вот пример кода для использования MergeHub
и BroadcastHub
в качестве динамических разворотных и разветвленных переходов соответственно.
Идея состоит в том, чтобы соединить MergeHub
с BroadcastHub
, чтобы сформировать паб-субканал в виде потока через Flow.fromSinkAndSource
:
val (bfSink, bfSource) = MergeHub.source[String](perProducerBufferSize).
toMat(BroadcastHub.sink[String](bufferSize))(Keep.both).
run
val busFlow: Flow[String, String, NotUsed] = Flow.fromSinkAndSource(bfSink, bfSource)
Обратите внимание, что Keep.both
в приведенном выше фрагменте создает кортеж материализованных значений (Sink[T, NotUsed], Source[T, NotUsed])
из MergeHub.source[T]
и BroadcastHub.sink[T]
, которые имеют следующие сигнатуры методов:
object MergeHub {
def source[T](perProducerBufferSize: Int): Source[T, Sink[T, NotUsed]] = // ...
// ...
}
object BroadcastHub {
def sink[T](bufferSize: Int): Sink[T, Source[T, NotUsed]] = // ...
// ...
}
Ниже приведен пример кода для простого паб-суб-канала busFlow
(аналогично примеру в документе Akka):
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl._
import akka.NotUsed
implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
val (bfSink, bfSource) = MergeHub.source[String](perProducerBufferSize = 32).
toMat(BroadcastHub.sink[String](bufferSize = 256))(Keep.both).
run
// Optional: avoid building up backpressure when there is no subscribers
bfSource.runWith(Sink.ignore)
val busFlow: Flow[String, String, NotUsed] = Flow.fromSinkAndSource(bfSink, bfSource)
Тестирование busFlow
:
Source(101 to 103).map(i => s"Batch(A)-$i").
delay(2.seconds, DelayOverflowStrategy.backpressure).
viaMat(busFlow)(Keep.right).
to(Sink.foreach{ case s: String => println("Consumer(1)-" + s) }).
run
Source(104 to 105).map(i => s"Batch(B)-$i").
viaMat(busFlow)(Keep.right).
to(Sink.foreach{ case s: String => println("Consumer(2)-" + s) }).
run
// Consumer(2)-Batch(B)-104
// Consumer(2)-Batch(B)-105
// Consumer(1)-Batch(B)-104
// Consumer(1)-Batch(B)-105
// Consumer(1)-Batch(A)-101
// Consumer(1)-Batch(A)-102
// Consumer(2)-Batch(A)-101
// Consumer(1)-Batch(A)-103
// Consumer(2)-Batch(A)-102
// Consumer(2)-Batch(A)-103
Служа в качестве паб-подканала, ввод busFlow
публикуется через bfSink
для всех подписчиков, в то время как его выходные данные передаются через bfSource
все опубликованные элементы. Например:
val p1 = Source.tick[Int](0.seconds, 5.seconds, 5).map(_.toString)
p1.runWith(bfSink)
val p2 = Source.tick[Int](2.seconds, 10.seconds, 10).map(_.toString)
p2.runWith(bfSink)
val s1 = bfSource
s1.runForeach(x => println(s"s1 --> $x"))
val s2 = bfSource
s2.runForeach(x => println(s"s2 --> $x"))
// s1 --> 5
// s2 --> 5
// s1 --> 10
// s2 --> 10
// s2 --> 5
// s1 --> 5
// s2 --> 5
// s1 --> 5
// s1 --> 10
// s2 --> 10
// s2 --> 5
// s1 --> 5
// ...
Другие соответствующие темы, которые могут представлять интерес, включают KillSwitch
для контроля завершения потока и PartitionHub
для маршрутизации элементов потока от данного производителя к динамическому набору потребителей.