Да, поток должен принимать и выдавать Message
s.
val source: Source[Message, SoMat] = ???
val flow: Flow[Message, Message, FMat] = ???
val sink: Sink[Message, SiMat] = ???
source.via(flow).runWith(sink) // Ignoring which materializations you'd actually want to keep
Это кажется ограничительным, но обратите внимание на некоторые подписи:
// in Source (with types at least partially expanded)
def map[T](f: Message => T): Source[T, SoMat]
// in Flow (with types at least partially expanded)
def map[T](f: Message => T): Flow[Message, T, FMat]
// in Sink (with types at least partially expanded)
def contramap[T](f: T => Message): Sink[T, SiMap]
т.е. вы можете взять Source[Message]
и сопоставьте его с Source
любого типа по вашему выбору, если у вас есть функция от Message
к этому типу. Точно так же вы можете взять Sink[Message]
и сопоставить его с Sink
любого типа. А для Flow
вы можете сопоставить его с любым типом и снова сопоставить с Message
.
Например, вы могли бы
val outgoing = Source.maybe[Message]
val decoder: Flow[Message, String, NotUsed] = Flow[Message].mapConcat { m =>
m match {
case TextMessage.Strict(msg) => List(msg)
case _ => Nil
}
}
val stringProcessor: Flow[String, String, NotUsed] = Flow[String].map { s => s.replace(':', ';') }
val sink: Sink[String, Future[Done]] = Sink.foreach[Message] {
case TextMessage.Strict(text) => println(text)
}.contramap[String] { s => TextMessage.Strict(s) }
val websocketFlow = Http().webSocketClientFlow(WebSocketRequest(uri))
val (upgradeResponse, closed) =
outgoing
.viaMat(websocketFlow)(Keep.right)
.viaMat(decoder)(Keep.left)
.viaMat(stringProcessor)(Keep.left)
.toMat(sink)(Keep.both)
.run()
Я бы, вероятно, скомбинировал websocketFlow
и decoder
в один поток от Message
до String
(или любой другой тип домена, применимый к бизнес-логам c (stringProcessor
в данном случае)), используя contramapped Sink
, как указано выше.
val stringsFromWebsocket: Flow[Message, String, Future[WebsocketUpgradeResponse]] =
Http()
.webSocketClientFlow(WebSocketRequest(uri))
.viaMat(decoder)(Keep.left)
val (upgradeResponse, closed) =
outgoing
.viaMat(stringsFromWebsocket)(Keep.right)
.viaMat(stringProcessor)(Keep.left)
.toMat(sink)(Keep.both)
.run()