В настоящее время я пытаюсь создать соединение веб-сокета akka-http, которое может:
- транслировать всем подключенным клиентам
- отвечать на определенные запросы клиентов
Вот как я до сих пор создавал свой поток:
// keeps a list of all actors so I can broadcast to them
var actors: List[ActorRef] = Nil
private def wsFlow(implicit materializer: ActorMaterializer): Flow[ws.Message, ws.Message, NotUsed] = {
val (actor, source) = Source.actorRef[String](10, akka.stream.OverflowStrategy.dropTail)
.toMat(BroadcastHub.sink[String])(Keep.both)
.run()
// this never triggers
source.watchTermination() { (m, f) =>
f.onComplete(r => println("TERMINATION: " + r.toString))
actors = actors diff actor :: Nil
m
}
actors = actor :: actors
val wsHandler: Flow[ws.Message, ws.Message, NotUsed] =
Flow[ws.Message]
.merge(source)
.map {
case TextMessage.Strict(tm) => handleMessage(actor, tm)
case _ => TextMessage.Strict("Ignored message!")
}
wsHandler
}
def broadcast(msg: String): Unit = {
actors.foreach(_ ! TextMessage.Strict(msg))
}
- я надеюсь - последняя проблема, с которой я сталкиваюсь, заключается в том, что обратный вызов watchTermination
никогда не срабатывает (я никогда не получаю "TERMINATION:... "сообщение на моей консоли).Это почему?И как можно определить, когда клиент покидает (чтобы я мог удалить его из своего списка actors
)?