Я пытаюсь обернуть голову вокруг потоков akka и способа работы с веб-сокетами, но некоторые вещи мне совершенно понятны.Для начала я пытаюсь установить одностороннюю связь от одного клиента к серверу и связь между тем же сервером и другим клиентом.
client1 -----> Server <------> client2
Я смотрел на приведенный пример здесь .Результирующий код выглядит примерно так:
1) начиная с контроллера
class Test @Inject()(@Named("connManager") myConnectionsManager: ActorRef, cc: ControllerComponents)
(implicit val actorSystem: ActorSystem,
val mat: Materializer,
implicit val executionContext: ExecutionContext)
extends AbstractController(cc) {
private def wsFutureFlow(id: String): Future[Flow[String, String, NotUsed]] = {
implicit val timeout: Timeout = Timeout(5.seconds)
val future = myConnectionsManager ? CreateRemote(id)
val futureFlow = future.mapTo[Flow[String, String, NotUsed]]
futureFlow
}
private def wsFutureLocalFlow: Future[Flow[String, String, NotUsed]] = {
implicit val timeout: Timeout = Timeout(5.seconds)
val future = myConnectionsManager ? CreateLocal
val futureFlow = future.mapTo[Flow[String, String, NotUsed]]
futureFlow
}
def ws: WebSocket = WebSocket.acceptOrResult[String, String] {
rh =>
wsFutureFlow(rh.id.toString).map { flow =>
Right(flow)
}
}
def wsLocal: WebSocket = WebSocket.acceptOrResult[String, String] {
_ =>
wsFutureLocalFlow.map { flow =>
Right(flow)
}
}
}
Что касается актера диспетчера соединений.Это было бы эквивалентно UserParentActor из примера.
class MyConnectionsManager @Inject()(childFactory: MyTestActor.Factory)
(implicit ec: ExecutionContext, mat: Materializer) extends Actor with InjectedActorSupport {
import akka.pattern.{ask, pipe}
implicit val timeout: Timeout = Timeout(2.seconds)
override def receive: Receive = {
case CreateRemote(x) =>
val child = injectedChild(childFactory(), s"remote-$x")
context.watch(child)
privatePipe(child)
case CreateLocal =>
val child = injectedChild(childFactory(), "localConnection")
context.become(onLocalConnected(child))
privatePipe(child)
case Terminated(child) =>
println(s"${child.path.name} terminated...")
}
def onLocalConnected(local: ActorRef): Receive = {
case CreateRemote(x) =>
val child = injectedChild(childFactory(), s"remote-$x")
context.watch(child)
privatePipe(child)
case x: SendToLocal => local ! x
}
private def privatePipe(child: ActorRef) = {
val future = (child ? Init).mapTo[Flow[String, String, _]]
pipe(future) to sender()
() // compiler throws exception without this: non-unit value discarded
}
}
И MyTestActor выглядит так:
class MyTestActor @Inject()(implicit mat: Materializer, ec: ExecutionContext) extends Actor {
val source: Source[String, Sink[String, NotUsed]] = MergeHub.source[String]
.recoverWithRetries(-1, { case _: Exception => Source.empty })
private val jsonSink: Sink[String, Future[Done]] = Sink.foreach { json =>
println(s"${self.path.name} got message: $json")
context.parent ! SendToLocal(json)
}
private lazy val websocketFlow: Flow[String, String, NotUsed] = {
Flow.fromSinkAndSourceCoupled(jsonSink, source).watchTermination() { (_, termination) =>
val name = self.path.name
termination.foreach(_ => context.stop(self))
NotUsed
}
}
def receive: Receive = {
case Init =>
println(s"${self.path.name}: INIT")
sender ! websocketFlow
case SendToLocal(x) =>
println(s"Local got from remote: $x")
case msg: String => sender ! s"Actor got message: $msg"
}
}
Что я не понимаю, кроме как приемники и источникиНа самом деле связь с актерами заключается в следующем.Когда я запускаю свою систему, я посылаю несколько сообщений актеру.Однако после того, как я закрываю соединение с субъектом с именем remote и продолжаю посылать сообщения тому, который называется localConnection, сообщения отправляются в DeadLetters:
[info] Done compiling.
[info] 15:49:20.606 - play.api.Play - Application started (Dev)
localConnection: INIT
localConnection got message: test data
Local got from remote: test data
localConnection got message: hello world
Local got from remote: hello world
remote-133: INIT
remote-133 got message: hello world
Local got from remote: hello world
remote-133 got message: hello from remote
Local got from remote: hello from remote
[error] 15:50:24.449 - a.a.OneForOneStrategy - Monitored actor [Actor[akka://application/user/connManager/remote-133#-998945083]] terminated
akka.actor.DeathPactException: Monitored actor [Actor[akka://application/user/connManager/remote-133#-998945083]] terminated
deadLetters got message: hello local
Я предполагаю, что это происходит из-за исключения... Может кто-нибудь объяснить мне, почему сообщение отправляется в DeadLetters?Кроме того, я хотел бы знать, почему я продолжаю получать исключение компилятора без "()", возвращаемого в конце privatePipe
?
Кроме того, я должен делать что-то по-другому?