Перехватывать события Akka HTTP WebSocket при закрытии сокета в Scala - PullRequest
0 голосов
/ 08 июня 2018

Я использую Scala с Akka HTTP.

У меня есть серверное серверное приложение, основанное на Akka HTTP.Входящие сообщения WebSocket обрабатываются с помощью handleWebSocketMessages:

/**
  * Route for WebSocket request
  */
private val webSocketRoute: Route = pathSuffix(Constants.WSPROXY_WEBSOCKET_PATH_SUFFIX) {
    LOGGER.debug("() Web socket route")
    handleWebSocketMessages(wsRouteImpl)
}

/**
  * This method calls all registered handlers.
  *
  * @return flow for handleWebSocketMessages method
  */
private def wsRouteImpl: Flow[Message, Message, Any] = {
    LOGGER.debug("() wsRouteHandling")
    numOfClients += 1

    LOGGER.info(s"Client has connected, current number of clients: $numOfClients")

    var flow = Flow[Message].mapConcat {
  // Call specific handlers depending on message type
  ...
}

Мои клиенты WebSocket устанавливают двусторонние соединения с поддержкой активности.

Привязка выполняется с помощью:

val binding = Http().bindAndHandle(webSocketRoute, config.host, config.port)

Проблема в том, что мне нужно ввести обратный вызов для закрытого сокета (например, если клиент отключился) и уменьшить текущее количество клиентов, но я не могу найти никаких точек входа для этого.

Можно ли перехватить какое-то событие при закрытии сокета?

1 Ответ

0 голосов
/ 08 июня 2018

Использование watchTermination:

val numOfClients = new java.util.concurrent.atomic.AtomicInteger(0)

private val webSocketRoute: Route = pathSuffix(Constants.WSPROXY_WEBSOCKET_PATH_SUFFIX) {
  LOGGER.debug("() Web socket route")

  val wsFlow: Flow[Message, Message, Any] =
    wsRouteImpl.watchTermination() { (_, fut) =>
      numOfClients.incrementAndGet()
      LOGGER.info(s"Client has connected. Current number of clients: $numOfClients")

      fut onComplete {
        case Success(_) =>
          numOfClients.decrementAndGet()
          LOGGER.info(s"Client has disconnected. Current number of clients: $numOfClients")
        case Failure(ex) =>
          numOfClients.decrementAndGet()
          LOGGER.error(s"Disconnection failure (number of clients: $numOfClients): $ex")
      }
    }

  handleWebSocketMessages(wsFlow)
}

private def wsRouteImpl: Flow[Message, Message, Any] = ???
...