Я использую Akka WebSockets для связи клиент-сервер.
На клиенте:
source
.viaMat(Http().webSocketClientFlow(httpReq, settings))(Keep.both)
.toMat(sink)(Keep.both)
.run()
, где источник равен
val source = Source.actorRef[Object](Int.MaxValue,OverflowStrategy.fail)
И приемник пересылает все сообщения на ClientActor
.
На сервере:
val requestHandler: HttpRequest => HttpResponse = {
case req @ HttpRequest(GET, Uri.Path("/"), _, _, _) =>
req.header[UpgradeToWebSocket] match {
case Some(upgrade: UpgradeToWebSocket) =>
/**
* Create an ActorRef for the client. Sending messages to this actor replies to the client.
* fanout = false, because there is only one output (no broadcasting).
*/
val source: Source[Object, ActorRef] = Source.actorRef[Object](Int.MaxValue, OverflowStrategy.fail)
val (clientActor: ActorRef, publisher) = source
.via(flow)
.toMat(Sink.asPublisher(fanout = false))(Keep.both).run()
// receives messages from the client
val sink: Sink[Message, NotUsed] = Common.getSink(messageHandler, clientActor)
upgrade.handleMessagesWithSinkSource(sink, Source.fromPublisher(publisher))
case None =>
HttpResponse(400, entity = "Not a valid websocket request!")
}
case r: HttpRequest =>
r.discardEntityBytes() // important to drain incoming HTTP Entity stream
HttpResponse(404, entity = "Unknown resource!")
}
А sink
пересылает все сообщения на ServerActor
.
ServerActor
def receive: PartialFunction[Any, Unit] = {
case Terminated(sender) =>
logger.info(s"Detected that client disconnected: $sender")
Но когда я останавливаю clientActor с помощью clientActor ! PoisonPill
, серверу требуется ровно минут, чтобы обнаружить, что clientActor был отключен.Какие изменения мне нужно внести в clientActor или serverActor, чтобы обнаруживать разрыв непосредственно?Любые указатели будут оценены.
Отладка, которую я сделал:
Я попытался использовать system.context.terminate()
, и с этим ServerActor незамедлительно обнаруживает отключение, но затем он не позволяет мне создать новый ClientActor.Я сталкиваюсь с « Почему Akka завершается с ошибкой« IllegalStateException: не удается создать дочерние элементы при завершении или завершении »при тестировании с ScalaTest? ».
Я немного покопался и обнаружил, что Akka WebSocket использует streamSupervisor
актеров для общения, и оба clientActor
и serverActor
наблюдают за этими streamSupervisor
актерами.
Ожидание здесьдолжен обнаруживать разъединение клиента на актере serverActor
.