Динамически объединять потоки Акка - PullRequest
1 голос
/ 05 июля 2019

Я пытаюсь использовать потоки Akka для создания вспомогательной шины паба следующим образом:

Publisher добавляет исходный поток для темы, а подписчики указывают тему и получают все для этой темы.Однако тема может быть опубликована несколькими издателями, и как издатель, так и подписчики могут присоединиться в любой момент.

Я хотел объединить все источники и затем вернуть отфильтрованный источник подписчику.

Однако, поскольку издатели могут присоединиться в любой момент, источники могут быть добавлены после подписки, и подписчик должен получать от нее данные, как и любые другие опубликованные данные по теме.

Есть ли способ динамически управлять слиянием потоков с источником, чтобы выполнялось следующее:

publish (topic: String, messages: Source [T]) подписка (topic:String): Source [T]

Таким образом, что независимо от того, когда издатель добавлен, подписчик в теме получит все сообщения, опубликованные в любом источнике, связанном с темой, после подписки.

Рад слышать и об альтернативных подходах.

Спасибо, Z

Ответы [ 2 ]

1 голос
/ 06 июля 2019

Возможно, вы захотите взглянуть на эту документацию Akka: построение dynamic pub-sub service с использованием MergeHub и BroadcastHub.

Вот пример кода для использования MergeHub и BroadcastHub в качестве динамических разворотных и разветвленных переходов соответственно.

Идея состоит в том, чтобы соединить MergeHub с BroadcastHub, чтобы сформировать паб-субканал в виде потока через Flow.fromSinkAndSource:

val (bfSink, bfSource) = MergeHub.source[String](perProducerBufferSize).
  toMat(BroadcastHub.sink[String](bufferSize))(Keep.both).
  run

val busFlow: Flow[String, String, NotUsed] = Flow.fromSinkAndSource(bfSink, bfSource)

Обратите внимание, что Keep.both в приведенном выше фрагменте создает кортеж материализованных значений (Sink[T, NotUsed], Source[T, NotUsed]) из MergeHub.source[T] и BroadcastHub.sink[T], которые имеют следующие сигнатуры методов:

object MergeHub {
  def source[T](perProducerBufferSize: Int): Source[T, Sink[T, NotUsed]] = // ...
  // ...
}

object BroadcastHub {
  def sink[T](bufferSize: Int): Sink[T, Source[T, NotUsed]] = // ...
  // ...
}

Ниже приведен пример кода для простого паб-суб-канала busFlow (аналогично примеру в документе Akka):

import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl._
import akka.NotUsed

implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher

val (bfSink, bfSource) = MergeHub.source[String](perProducerBufferSize = 32).
  toMat(BroadcastHub.sink[String](bufferSize = 256))(Keep.both).
  run

// Optional: avoid building up backpressure when there is no subscribers
bfSource.runWith(Sink.ignore)

val busFlow: Flow[String, String, NotUsed] = Flow.fromSinkAndSource(bfSink, bfSource)

Тестирование busFlow:

Source(101 to 103).map(i => s"Batch(A)-$i").
  delay(2.seconds, DelayOverflowStrategy.backpressure).
  viaMat(busFlow)(Keep.right).
  to(Sink.foreach{ case s: String => println("Consumer(1)-" + s) }).
  run

Source(104 to 105).map(i => s"Batch(B)-$i").
  viaMat(busFlow)(Keep.right).
  to(Sink.foreach{ case s: String => println("Consumer(2)-" + s) }).
  run

// Consumer(2)-Batch(B)-104
// Consumer(2)-Batch(B)-105
// Consumer(1)-Batch(B)-104
// Consumer(1)-Batch(B)-105
// Consumer(1)-Batch(A)-101
// Consumer(1)-Batch(A)-102
// Consumer(2)-Batch(A)-101
// Consumer(1)-Batch(A)-103
// Consumer(2)-Batch(A)-102
// Consumer(2)-Batch(A)-103

Служа в качестве паб-подканала, ввод busFlow публикуется через bfSink для всех подписчиков, в то время как его выходные данные передаются через bfSource все опубликованные элементы. Например:

val p1 = Source.tick[Int](0.seconds, 5.seconds, 5).map(_.toString)
p1.runWith(bfSink)

val p2 = Source.tick[Int](2.seconds, 10.seconds, 10).map(_.toString)
p2.runWith(bfSink)

val s1 = bfSource
s1.runForeach(x => println(s"s1 --> $x"))

val s2 = bfSource
s2.runForeach(x => println(s"s2 --> $x"))

// s1 --> 5
// s2 --> 5
// s1 --> 10
// s2 --> 10
// s2 --> 5
// s1 --> 5
// s2 --> 5
// s1 --> 5
// s1 --> 10
// s2 --> 10
// s2 --> 5
// s1 --> 5
// ...

Другие соответствующие темы, которые могут представлять интерес, включают KillSwitch для контроля завершения потока и PartitionHub для маршрутизации элементов потока от данного производителя к динамическому набору потребителей.

0 голосов
/ 07 июля 2019

Вот что я в итоге сделал. Как издатели, так и подписчики могут приходить и исчезать, и независимо от того, когда подписчик присоединяется и когда присоединяется издатель, подписчик должен иметь возможность видеть все опубликованные сообщения для своей подписки (по темам) независимо от того, какие издатели были активны на момент подписки. сделал. Комментарии приветствуются.

def main(args: Array[String]): Unit = {
   val actorSystem = ActorSystem("test")
   val materializerSettings = ActorMaterializerSettings(actorSystem)
   implicit val materializer = ActorMaterializer(materializerSettings)(actorSystem)
   implicit val ec: ExecutionContext = actorSystem.dispatcher

   val (queue, pub) = Source.queue[Int](100, akka.stream.OverflowStrategy.dropHead).toMat(Sink.asPublisher(true))(Keep.both).run()

   val p1 = Source.tick[Int](0.seconds, 5.seconds, 5)

   p1.runForeach(x=> {queue.offer(x)})

   val p2= Source.tick[Int](2.seconds,10.seconds, 10)
   p2.runForeach(x=> queue.offer(x))

   val s1 = Source.fromPublisher(pub)
   s1.runForeach(x=> println(s"s1 =======> ${x}"))

   val s2 = Source.fromPublisher(pub)
   s2.runForeach(x=> println(s"s2 =======> ${x}"))
}
...