Как передать один и тот же источник на один или несколько HTTP-серверов и WebSockets? - PullRequest
0 голосов
/ 15 января 2019

Учитывая источник ByteStrings, мне нужно перенаправить поток на один или несколько серверов akka-http, а также записать каждую ByteString в один или несколько WebSockets.

Я пытался использовать график для широковещательной рассылки ByteStrings, но это привело к выполнению HTTP-push для каждой ByteString вместо потоковой передачи их на сервер.

Я также пытался использовать BroadcastHub.sink, чтобы получить повторно используемый источник, который можно использовать для создания потокового HttpRequest. Это работает, но кажется, что источник очищается или завершается, прежде чем данные читаются в некоторых случаях. Удаленные HTTP-серверы заканчивают с 0 или слишком маленькими байтами, если есть более быстрые локальные WebSockets, потребляющие данные.

Любые предложения о том, как сделать это надежным способом?

Например, вот часть кода, использующего BroadcastHub.sink:

val runnableGraph  = source.toMat(BroadcastHub.sink(bufferSize = 1))(Keep.right)
val reusableSource = runnableGraph.run()

def sendToRemoteServer(h: ServerInfo) = {
  Source
    .single(makeHttpRequest(streamName, h, reusableSource))
    .via(remoteConnections(h))
    .runWith(Sink.ignore)
}

val remoteSetF = remoteHostSet.map(sendToRemoteServer)

def sendToLocalWebSocket(a: AccessInfo) = {
  reusableSource
    .alsoTo(localSubscribers(a).sink)
    .mapAsync(1) { bs =>
      waitForAck(a).map(_ => ByteString.empty)
    }
    .runWith(Sink.ignore)
}

val localSetF = localSet.map(sendToLocalWebSocket)

Future.sequence(localSetF ++ remoteSetF).map(_ => Done)

Другое решение, которое я пробовал, - это использование графика, однако для потоковой передачи данных на HTTP-серверы мне нужно передать источник при создании HttpRequest, которого у меня нет внутри графика:

  // Create the graph
  source ~> bcast
  localFlows.foreach(bcast ~> _ ~> merge)
  remoteFlows.foreach(bcast ~> _ ~> merge)
  merge ~> out
  ClosedShape
...