Остановить источник потока Akka, когда клиент закрывает соединение через веб-сокет - PullRequest
0 голосов
/ 08 января 2019

У меня есть akka http web socket Route с кодом, похожим на:

private val wsReader: Route = путь ("v1" / "данные" / "ws") { log.info («Открытие подключения веб-сокета ...»)

  val testSource = Source
    .repeat("Hello")
    .throttle(1, 1.seconds)
    .map(x => {
      println(x)
      x
    })
    .map(TextMessage.Strict)
    .limit(1000)

  extractUpgradeToWebSocket { upgrade ⇒
    complete(upgrade.handleMessagesWithSinkSource(Sink.ignore, testSource))
  }
}

Все отлично работает (я получаю от клиента 1 тестовое сообщение каждую секунду). Единственная проблема заключается в том, что я не понимаю, как остановить / закрыть Source (testSource), если клиент закрывает соединение через веб-сокет.

Вы можете видеть, что источник продолжает создавать элементы (см. println), даже если веб-сокет не работает.

Как я могу обнаружить отключение клиента?

Ответы [ 2 ]

0 голосов
/ 09 января 2019

handleMessagesWithSinkSource реализован как:

/**
 * The high-level interface to create a WebSocket server based on "messages".
 *
 * Returns a response to return in a request handler that will signal the
 * low-level HTTP implementation to upgrade the connection to WebSocket and
 * use the supplied inSink to consume messages received from the client and
 * the supplied outSource to produce message to sent to the client.
 *
 * Optionally, a subprotocol out of the ones requested by the client can be chosen.
 */
def handleMessagesWithSinkSource(
  inSink:      Graph[SinkShape[Message], Any],
  outSource:   Graph[SourceShape[Message], Any],
  subprotocol: Option[String]                   = None): HttpResponse =

  handleMessages(Flow.fromSinkAndSource(inSink, outSource), subprotocol)

Это означает, что приемник и источник независимы, и действительно, источник должен продолжать создавать элементы, даже когда клиент закрывает входящую сторону соединения. Однако он должен остановиться, когда клиент полностью сбросит соединение.

Чтобы прекратить создание исходящих данных, как только входящее соединение закрыто, вы можете использовать Flow.fromSinkAndSourceCoupled, поэтому:

val socket = upgrade.handleMessages(
  Flow.fromSinkAndSourceCoupled(inSink, outSource)
  subprotocol = None
)
0 голосов
/ 08 января 2019

Одним из способов является использование KillSwitches для обработки выключения testSource.

private val wsReader: Route =
path("v1" / "data" / "ws") {
  logger.info("Opening websocket connecting ...")

  val sharedKillSwitch = KillSwitches.shared("my-kill-switch")

  val testSource =
    Source
     .repeat("Hello")
     .throttle(1, 1.seconds)
     .map(x => {
       println(x)
       x
     })
    .map(TextMessage.Strict)
    .limit(1000)
    .via(sharedKillSwitch.flow)

  extractUpgradeToWebSocket { upgrade ⇒
    val inSink = Sink.onComplete(_ => sharedKillSwitch.shutdown())
    val outSource = testSource
    val socket = upgrade.handleMessagesWithSinkSource(inSink, outSource)

    complete(socket)
  }
}
...