Как вы обрабатываете последовательность источников Akka Stream? - PullRequest
1 голос
/ 24 января 2020

У нас есть Sink, который может обрабатывать события:

def parseEvent(): Sink[T, Future[akka.Done]] = {
  Sink.foreach[T] { event => {
    // Do stuff with the event
  }}
}

Это прекрасно работает с одним Source:

val mySource: Source[T] = ...  
mySource.takeWhile( someCheck, true ).runWith(parseEvent)

Как вы работаете, если вы вместо этого:

val mySources: Seq[Source[T]] = ...

Все источники должны работать параллельно, и все события должны достигать parseEvent.

1 Ответ

1 голос
/ 24 января 2020

Что-то в следующих строках должно соответствовать требованиям:

import akka.NotUsed
import akka.stream.scaladsl.{ Concat, Merge, Source }

def sourceFromSources[T](sources: Seq[Source[T, NotUsed]]): Source[T, NotUsed] =
  sources.size match {
    case s if s < 1 => Source.empty[T]
    case 1 => sources.head
    case 2 => sources.head.merge(sources(1))
    case _ => Source.combine(sources.head, sources(1), sources(2), sources.drop(2): _*)(Merge(_))
  }

Стратегия объединения «объединяет несколько потоков, отбирая элементы по мере их поступления из входных потоков» и выбирает случайным образом, если несколько потоков имеют доступные элементы. Противодавление распространяется от нижестоящих к восходящим.

...