Динамичный веер с Akka Streams - PullRequest
0 голосов
/ 29 апреля 2018

Я создаю приложение Akka Streams, которое проходит через несколько шагов. Существует один конкретный шаг, который дает 0 или более результатов, заранее неизвестно, сколько их. Каждый из результатов должен обрабатываться асинхронно (одним и тем же компонентом), и, наконец, все результаты должны быть объединены.

Как мне смоделировать это в Akka Streams? Я заметил, что в GraphDsl есть элемент Broadcast, который позволяет моделировать веер, однако это возможно только тогда, когда заранее известно количество выходов. Есть ли способ в Akka Streams иметь что-то вроде Broadcast, но это дает динамичное количество выходов?

Ответы [ 2 ]

0 голосов
/ 01 мая 2018

Оказывается, mapConcat делает то, что я хочу. Вот POC:

package streams

import scala.concurrent._
import akka._
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import scala.util.Random

object StreamsTest extends App {
  implicit val system = ActorSystem("TestSystem")
  implicit val materializer = ActorMaterializer()
  import system.dispatcher

  case class SplitRequest(s: String)

  def requestHandlerWithMultipleResults(request: SplitRequest): List[String] = 
   request.s.split(" ").toList

  def slowProcessingTask(s: String) =  {
    Thread.sleep(Random.nextInt(5000))
    s.toUpperCase
  }

  val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
    import GraphDSL.Implicits._

    val source: Source[String, NotUsed] = Source(List(SplitRequest("january february march april may")))
      .mapConcat(requestHandlerWithMultipleResults)
      .mapAsyncUnordered(5)(s => Future(slowProcessingTask(s)))

    val sink = Sink.foreach(println)

    source ~> sink

    ClosedShape
  })

  g.run()
}

Вывод, например:

MAY
JANUARY
FEBRUARY
MARCH
APRIL
0 голосов
/ 30 апреля 2018

Проверить Концентраторы на этой странице: https://doc.akka.io/docs/akka/current/stream/stream-dynamic.html?language=scala

Во многих случаях потребители или производители определенной услуги (представленной как Sink, Source или, возможно, Flow) являются динамическими и заранее неизвестными. Граф DSL не позволяет представлять это, все соединения графа должны быть известны заранее и должны быть подключены заранее. Для обеспечения динамического потокового ввода и вывода необходимо использовать концентраторы.

...