Как объединить исходную очередь потока akka с графиком? - PullRequest
0 голосов
/ 28 июня 2018

У меня очередь нужно транслировать и объединять с использованием потоковых графиков akka. введите описание изображения здесь

Я нашел графическое демо и демо очереди. и не знаю, как объединить их . Может кто-нибудь мне помочь? Спасибо

Вот пример графика

val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: 
GraphDSL.Builder[NotUsed] =>
  import GraphDSL.Implicits._
  val in = Source(1 to 10)
  val out = Sink.ignore

  val bcast = builder.add(Broadcast[Int](2))
  val merge = builder.add(Merge[Int](2))

  val f1, f2, f3, f4 = Flow[Int].map(_ + 10)

  in ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> out
  bcast ~> f4 ~> merge
  ClosedShape
})

А вот демоверсия очереди

val bufferSize = 5
val elementsToProcess = 3

val queue = Source
  .queue[Int](bufferSize, OverflowStrategy.backpressure)
  .throttle(elementsToProcess, 3.second)
  .map(x ⇒ x * x)
  .toMat(Sink.foreach(x ⇒ println(s"completed $x")))(Keep.left)
  .run()

val source = Source(1 to 10)

implicit val ec = system.dispatcher
source.mapAsync(1)(x ⇒ {
  queue.offer(x).map {
    case QueueOfferResult.Enqueued    ⇒ println(s"enqueued $x")
    case QueueOfferResult.Dropped     ⇒ println(s"dropped $x")
    case QueueOfferResult.Failure(ex) ⇒ println(s"Offer failed 
${ex.getMessage}")
    case QueueOfferResult.QueueClosed ⇒ println("Source Queue closed")
  }
}).runWith(Sink.ignore)

Я хочу запустить график, который возвращает очередь, чтобы я мог предложить ей элемент. Спасибо

1 Ответ

0 голосов
/ 29 июня 2018

Ваш val queue является результатом запуска "очереди" (которая стала RunnableGraph через комбинатор toMat). Ваш g также RunnableGraph (который вы можете назвать запустить). Предложение элемента для такого графа означает определение Source, который пропускает элементы ниже по потоку. То, что вы можете объединить, это различные компоненты, составляющие такой работающий граф. Это требует Source и Sink и может иметь произвольное количество Flow компонентов между ними. Я бы посоветовал вам пройти через официальную документацию для потоков akka , чтобы понять, как они работают в целом, и посмотреть на раздел пользовательских графиков в частности.

...