Play framework akka stream Обработка сообщений веб-сокетов отправляется в deadletters - PullRequest
0 голосов
/ 28 сентября 2018

Я пытаюсь обернуть голову вокруг потоков akka и способа работы с веб-сокетами, но некоторые вещи мне совершенно понятны.Для начала я пытаюсь установить одностороннюю связь от одного клиента к серверу и связь между тем же сервером и другим клиентом.

client1 -----> Server <------> client2

Я смотрел на приведенный пример здесь .Результирующий код выглядит примерно так:

1) начиная с контроллера

class Test @Inject()(@Named("connManager") myConnectionsManager: ActorRef, cc: ControllerComponents)
                (implicit val actorSystem: ActorSystem,
                 val mat: Materializer,
                 implicit val executionContext: ExecutionContext)
extends AbstractController(cc) {


private def wsFutureFlow(id: String): Future[Flow[String, String, NotUsed]] = {
    implicit val timeout: Timeout = Timeout(5.seconds)
    val future = myConnectionsManager ? CreateRemote(id)
    val futureFlow = future.mapTo[Flow[String, String, NotUsed]]
    futureFlow
}

private def wsFutureLocalFlow: Future[Flow[String, String, NotUsed]] =   {
    implicit val timeout: Timeout = Timeout(5.seconds)
    val future = myConnectionsManager ? CreateLocal
    val futureFlow = future.mapTo[Flow[String, String, NotUsed]]
    futureFlow
}

def ws: WebSocket = WebSocket.acceptOrResult[String, String] {

    rh =>
        wsFutureFlow(rh.id.toString).map { flow =>
            Right(flow)
        }
}

def wsLocal: WebSocket = WebSocket.acceptOrResult[String, String] {

    _ =>
        wsFutureLocalFlow.map { flow =>
            Right(flow)
        }
}
}

Что касается актера диспетчера соединений.Это было бы эквивалентно UserParentActor из примера.

class MyConnectionsManager @Inject()(childFactory: MyTestActor.Factory)
                                (implicit ec: ExecutionContext, mat: Materializer) extends Actor with InjectedActorSupport {

import akka.pattern.{ask, pipe}

implicit val timeout: Timeout = Timeout(2.seconds)

override def receive: Receive = {
    case CreateRemote(x) =>
        val child = injectedChild(childFactory(), s"remote-$x")
        context.watch(child)
        privatePipe(child)
    case CreateLocal =>
        val child = injectedChild(childFactory(), "localConnection")
        context.become(onLocalConnected(child))
        privatePipe(child)
    case Terminated(child) =>
        println(s"${child.path.name} terminated...")
}

def onLocalConnected(local: ActorRef): Receive = {
    case CreateRemote(x) =>
        val child = injectedChild(childFactory(), s"remote-$x")
        context.watch(child)
        privatePipe(child)
    case x: SendToLocal => local ! x
}

private def privatePipe(child: ActorRef) = {
    val future = (child ? Init).mapTo[Flow[String, String, _]]
    pipe(future) to sender()
    () // compiler throws exception without this: non-unit value discarded
    }
}

И MyTestActor выглядит так:

class MyTestActor @Inject()(implicit mat: Materializer, ec: ExecutionContext) extends Actor {

val source: Source[String, Sink[String, NotUsed]] = MergeHub.source[String]
  .recoverWithRetries(-1, { case _: Exception => Source.empty })

private val jsonSink: Sink[String, Future[Done]] = Sink.foreach { json =>
    println(s"${self.path.name} got message:  $json")

    context.parent ! SendToLocal(json)
}

private lazy val websocketFlow: Flow[String, String, NotUsed] = {
    Flow.fromSinkAndSourceCoupled(jsonSink, source).watchTermination() { (_, termination) =>
        val name = self.path.name
        termination.foreach(_ => context.stop(self))
        NotUsed
    }
}

def receive: Receive = {

    case Init =>
        println(s"${self.path.name}: INIT")
        sender ! websocketFlow
    case SendToLocal(x) =>
        println(s"Local got from remote: $x")
    case msg: String => sender ! s"Actor got message: $msg"
    }
}

Что я не понимаю, кроме как приемники и источникиНа самом деле связь с актерами заключается в следующем.Когда я запускаю свою систему, я посылаю несколько сообщений актеру.Однако после того, как я закрываю соединение с субъектом с именем remote и продолжаю посылать сообщения тому, который называется localConnection, сообщения отправляются в DeadLetters:

[info] Done compiling.
[info] 15:49:20.606 - play.api.Play - Application started (Dev)
localConnection: INIT
localConnection got message:  test data
Local got from remote: test data
localConnection got message:  hello world
Local got from remote: hello world
remote-133: INIT
remote-133 got message:  hello world
Local got from remote: hello world
remote-133 got message:  hello from remote
Local got from remote: hello from remote
[error] 15:50:24.449 - a.a.OneForOneStrategy - Monitored actor [Actor[akka://application/user/connManager/remote-133#-998945083]] terminated
akka.actor.DeathPactException: Monitored actor [Actor[akka://application/user/connManager/remote-133#-998945083]] terminated
deadLetters got message:  hello local   

Я предполагаю, что это происходит из-за исключения... Может кто-нибудь объяснить мне, почему сообщение отправляется в DeadLetters?Кроме того, я хотел бы знать, почему я продолжаю получать исключение компилятора без "()", возвращаемого в конце privatePipe?

Кроме того, я должен делать что-то по-другому?

1 Ответ

0 голосов
/ 29 сентября 2018

Я понял, что генерируется исключение, потому что я забыл обработать сообщение Ter прекращено в новом поведении субъекта MyConnectionsManager.

def onLocalConnected(local: ActorRef): Receive = {
    case CreateRemote(x) =>
        val child = injectedChild(childFactory(), s"remote-$x")
        context.watch(child)
        privatePipe(child)
    case Terminated(child) => println(s"${child.path.name} terminated...")
    case x: SendToLocal => local ! x
}

Кажется, сейчас работает.

...