Учитывая источник ByteStrings, мне нужно перенаправить поток на один или несколько серверов akka-http, а также записать каждую ByteString в один или несколько WebSockets.
Я пытался использовать график для широковещательной рассылки ByteStrings, но это привело к выполнению HTTP-push для каждой ByteString вместо потоковой передачи их на сервер.
Я также пытался использовать BroadcastHub.sink, чтобы получить повторно используемый источник, который можно использовать для создания потокового HttpRequest. Это работает, но кажется, что источник очищается или завершается, прежде чем данные читаются в некоторых случаях. Удаленные HTTP-серверы заканчивают с 0 или слишком маленькими байтами, если есть более быстрые локальные WebSockets, потребляющие данные.
Любые предложения о том, как сделать это надежным способом?
Например, вот часть кода, использующего BroadcastHub.sink:
val runnableGraph = source.toMat(BroadcastHub.sink(bufferSize = 1))(Keep.right)
val reusableSource = runnableGraph.run()
def sendToRemoteServer(h: ServerInfo) = {
Source
.single(makeHttpRequest(streamName, h, reusableSource))
.via(remoteConnections(h))
.runWith(Sink.ignore)
}
val remoteSetF = remoteHostSet.map(sendToRemoteServer)
def sendToLocalWebSocket(a: AccessInfo) = {
reusableSource
.alsoTo(localSubscribers(a).sink)
.mapAsync(1) { bs =>
waitForAck(a).map(_ => ByteString.empty)
}
.runWith(Sink.ignore)
}
val localSetF = localSet.map(sendToLocalWebSocket)
Future.sequence(localSetF ++ remoteSetF).map(_ => Done)
Другое решение, которое я пробовал, - это использование графика, однако для потоковой передачи данных на HTTP-серверы мне нужно передать источник при создании HttpRequest, которого у меня нет внутри графика:
// Create the graph
source ~> bcast
localFlows.foreach(bcast ~> _ ~> merge)
remoteFlows.foreach(bcast ~> _ ~> merge)
merge ~> out
ClosedShape