Как немедленно уведомить сервер Akka WebSocket, что клиент WebSocket остановился - PullRequest
0 голосов
/ 04 июня 2019

Я использую Akka WebSockets для связи клиент-сервер.

На клиенте:

source
    .viaMat(Http().webSocketClientFlow(httpReq, settings))(Keep.both)
    .toMat(sink)(Keep.both)
    .run()

, где источник равен

val source = Source.actorRef[Object](Int.MaxValue,OverflowStrategy.fail)

И приемник пересылает все сообщения на ClientActor.

На сервере:

val requestHandler: HttpRequest => HttpResponse = {
      case req @ HttpRequest(GET, Uri.Path("/"), _, _, _) =>
        req.header[UpgradeToWebSocket] match {
          case Some(upgrade: UpgradeToWebSocket) =>

            /**
              * Create an ActorRef for the client. Sending messages to this actor replies to the client.
              * fanout = false, because there is only one output (no broadcasting).
              */
            val source: Source[Object, ActorRef] = Source.actorRef[Object](Int.MaxValue, OverflowStrategy.fail)

            val (clientActor: ActorRef, publisher) = source
              .via(flow)
              .toMat(Sink.asPublisher(fanout = false))(Keep.both).run()

            // receives messages from the client

            val sink: Sink[Message, NotUsed] = Common.getSink(messageHandler, clientActor)

            upgrade.handleMessagesWithSinkSource(sink, Source.fromPublisher(publisher))

          case None =>
            HttpResponse(400, entity = "Not a valid websocket request!")
        }
      case r: HttpRequest =>
        r.discardEntityBytes() // important to drain incoming HTTP Entity stream
        HttpResponse(404, entity = "Unknown resource!")
    }

А sink пересылает все сообщения на ServerActor.

ServerActor

def receive: PartialFunction[Any, Unit] = {
    case Terminated(sender) =>
      logger.info(s"Detected that client disconnected: $sender")

Но когда я останавливаю clientActor с помощью clientActor ! PoisonPill, серверу требуется ровно минут, чтобы обнаружить, что clientActor был отключен.Какие изменения мне нужно внести в clientActor или serverActor, чтобы обнаруживать разрыв непосредственно?Любые указатели будут оценены.

Отладка, которую я сделал:

Я попытался использовать system.context.terminate(), и с этим ServerActor незамедлительно обнаруживает отключение, но затем он не позволяет мне создать новый ClientActor.Я сталкиваюсь с « Почему Akka завершается с ошибкой« IllegalStateException: не удается создать дочерние элементы при завершении или завершении »при тестировании с ScalaTest? ».

Я немного покопался и обнаружил, что Akka WebSocket использует streamSupervisor актеров для общения, и оба clientActor и serverActor наблюдают за этими streamSupervisor актерами.

Ожидание здесьдолжен обнаруживать разъединение клиента на актере serverActor.

1 Ответ

0 голосов
/ 08 июня 2019

Вы можете использовать Sink.actorRef для стока потока.После того, как соединение будет прервано клиентом, субъект получит это сообщение, и вы можете отравить дочернего субъекта, соответствующего WebSocket.

...