Как отключить буферизацию сообщений на сервере Akka WebSocket? - PullRequest
0 голосов
/ 18 мая 2018

У меня очень простой сервер Akka WebSocket, который передает строки из файла на подключенный клиент с интервалом 400 мс на строку.Все работает нормально, за исключением того факта, что веб-сервер, похоже, буферизует сообщения в течение примерно минуты, прежде чем транслировать их.

Поэтому, когда клиент подключается, я вижу на стороне сервера, что каждые 400 мс строка читается иподтолкнул к Sink, но на стороне клиента я ничего не получаю в течение минуты, а затем пакет из около 150 сообщений (соответствует минуте сообщений).

Есть ли параметр, который я пропускаю?

object WebsocketServer extends App {
  implicit val actorSystem = ActorSystem("WebsocketServer")
  implicit val materializer = ActorMaterializer()
  implicit val executionContext = actorSystem.dispatcher

  val file = Paths.get("websocket-server/src/main/resources/EURUSD.txt")
  val fileSource =
    FileIO.fromPath(file)
      .via(Framing.delimiter(ByteString("\n"), Int.MaxValue))

  val delayedSource: Source[Strict, Future[IOResult]] =
    fileSource
      .map { line =>
        Thread.sleep(400)
        println(line.utf8String)
        TextMessage(line.utf8String)
      }

  def route = path("") {
    extractUpgradeToWebSocket { upgrade =>
      complete(upgrade.handleMessagesWithSinkSource(
        Sink.ignore,
        delayedSource)
      )
    }
  }

  val bindingFuture = Http().bindAndHandle(route, "localhost", 8080)

  bindingFuture.onComplete {
    case Success(binding) ⇒
      println(s"Server is listening on ws://localhost:8080")
    case Failure(e) ⇒
      println(s"Binding failed with ${e.getMessage}")
      actorSystem.terminate()
  }
}

1 Ответ

0 голосов
/ 21 мая 2018

Таким образом, подход с Thread.sleep(400) был неправильным.Я должен был использовать механику .throttle на источниках:

val delayedSource: Source[Strict, Future[IOResult]] =
    fileSource
      .throttle(elements = 1, per = 400.millis)
      .map { line =>
        println(line.utf8String)
        TextMessage(line.utf8String)
      }

Это решило проблему.

...