У нас есть Sink
, который может обрабатывать события:
def parseEvent(): Sink[T, Future[akka.Done]] = {
Sink.foreach[T] { event => {
// Do stuff with the event
}}
}
Это прекрасно работает с одним Source
:
val mySource: Source[T] = ...
mySource.takeWhile( someCheck, true ).runWith(parseEvent)
Как вы работаете, если вы вместо этого:
val mySources: Seq[Source[T]] = ...
Все источники должны работать параллельно, и все события должны достигать parseEvent
.