Веб-сокет на стороне клиента Akka неожиданно закрывается - PullRequest
1 голос
/ 10 мая 2019

У меня есть конечная точка websocket, которая отправляет текстовое сообщение клиенту каждую секунду.Клиент никогда не отправляет никаких сообщений на сервер.

Используя приведенный ниже код JS, он работает, как и ожидалось, он выходит из системы каждую секунду:

var ws = new WebSocket("ws://url_of_my_endpoint");
ws.onmessage = (message) => console.log(message.data);

Я хочу создатьаналогичный потребитель в Scala, использующий Akka HTTP.Я создал приведенный ниже код на основе официальных документов .

implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
import system.dispatcher

val url = "ws://url_of_my_endpoint"

val outgoing: Source[Message, NotUsed] = Source.empty

val webSocketFlow =
  Http().webSocketClientFlow(WebSocketRequest(url))

val printSink: Sink[Message, Future[Done]] =
  Sink.foreach[Message] {
    case message: TextMessage.Strict =>
      println("message received: " + message.text)
    case _  => println("some other message")
  }

val (upgradeResponse, closed) =
  outgoing
    .viaMat(webSocketFlow)(Keep.right)
    .toMat(printSink)(Keep.both)
    .run()

val connected = upgradeResponse.map { upgrade =>
  if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
    Done
  } else {
    throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
  }
}

connected.onComplete(_ => println("Connection established."))
closed.foreach(_ => println("Connection closed."))

Проблема заключается в том, что соединение закрывается через несколько секунд.Иногда через 1 секунду, иногда через 3-4 секунды.Клиент JS работает просто отлично, поэтому я предполагаю, что проблема не в сервере.

В чем проблема в коде?Как это должно быть изменено, чтобы оно сообщало мне, что пошло не так?

1 Ответ

1 голос
/ 10 мая 2019

Одна проблема заключается в том, что вы не отправляете никаких сообщений через поток:

val outgoing: Source[Message, NotUsed] = Source.empty

Попробуйте выполнить что-то вроде следующего, которое отправляет случайное значение TextMessage каждую секунду:

import scala.concurrent.duration._

val outgoing: Source[Message, NotUsed] =
  Source
    .fromIterator(() => Iterator.continually(TextMessage(scala.util.Random.nextInt().toString)))
    .throttle(1, 1 second)
...