handleMessagesWithSinkSource реализован как:
/**
* The high-level interface to create a WebSocket server based on "messages".
*
* Returns a response to return in a request handler that will signal the
* low-level HTTP implementation to upgrade the connection to WebSocket and
* use the supplied inSink to consume messages received from the client and
* the supplied outSource to produce message to sent to the client.
*
* Optionally, a subprotocol out of the ones requested by the client can be chosen.
*/
def handleMessagesWithSinkSource(
inSink: Graph[SinkShape[Message], Any],
outSource: Graph[SourceShape[Message], Any],
subprotocol: Option[String] = None): HttpResponse =
handleMessages(Flow.fromSinkAndSource(inSink, outSource), subprotocol)
Это означает, что приемник и источник независимы, и действительно, источник должен продолжать создавать элементы, даже когда клиент закрывает входящую сторону соединения. Однако он должен остановиться, когда клиент полностью сбросит соединение.
Чтобы прекратить создание исходящих данных, как только входящее соединение закрыто, вы можете использовать Flow.fromSinkAndSourceCoupled
, поэтому:
val socket = upgrade.handleMessages(
Flow.fromSinkAndSourceCoupled(inSink, outSource)
subprotocol = None
)