watchTermination не запускается в потоке akka-http - PullRequest
0 голосов
/ 04 февраля 2019

В настоящее время я пытаюсь создать соединение веб-сокета akka-http, которое может:

  • транслировать всем подключенным клиентам
  • отвечать на определенные запросы клиентов

Вот как я до сих пор создавал свой поток:


// keeps a list of all actors so I can broadcast to them
var actors: List[ActorRef] = Nil

private def wsFlow(implicit materializer: ActorMaterializer): Flow[ws.Message, ws.Message, NotUsed] = {
    val (actor, source) = Source.actorRef[String](10, akka.stream.OverflowStrategy.dropTail)
      .toMat(BroadcastHub.sink[String])(Keep.both)
      .run()

    // this never triggers
    source.watchTermination() { (m, f) =>
      f.onComplete(r => println("TERMINATION: " + r.toString))
      actors = actors diff actor :: Nil
      m
    }

    actors = actor :: actors

    val wsHandler: Flow[ws.Message, ws.Message, NotUsed] =
      Flow[ws.Message]
        .merge(source)
        .map {
          case TextMessage.Strict(tm) => handleMessage(actor, tm)
          case _ => TextMessage.Strict("Ignored message!")
        }
    wsHandler
  }

  def broadcast(msg: String): Unit = {
    actors.foreach(_ ! TextMessage.Strict(msg))
  }

- я надеюсь - последняя проблема, с которой я сталкиваюсь, заключается в том, что обратный вызов watchTermination никогда не срабатывает (я никогда не получаю "TERMINATION:... "сообщение на моей консоли).Это почему?И как можно определить, когда клиент покидает (чтобы я мог удалить его из своего списка actors)?

1 Ответ

0 голосов
/ 04 февраля 2019

Я разобрался, как это сделать:

val wsHandler: Flow[ws.Message, ws.Message, NotUsed] = Flow[ws.Message]
  .watchTermination() { (m, f) =>
    f.onComplete(r => {
      println("Client left: " + r.toString)
      actors = actors diff actor :: Nil
      }
    )
    m
  }
  .merge(source)
  .map {
    case TextMessage.Strict(tm) => handleMessage(actor, tm)
    case _ => TextMessage.Strict("Ignored message!")
  }
...