Akka TCP-запрос ответа нужен Thread.sleep - PullRequest
0 голосов
/ 07 февраля 2019

Доброе утро, у меня проблема с базовым вводом-выводом akka по TCP

У меня есть базовая реализация Client и Server, как показано в документации по akka:

Client is https://github.com/akka/akka/blob/v2.5.20/akka-docs/src/test/scala/docs/io/IODocSpec.scala#L67-L103

И Handler - это [SimpleEchoHandler] (https://github.com/akka/akka/blob/v2.5.20/akka-docs/src/test/scala/docs/io/EchoServer.scala#L227-L304), но другой действует аналогично.

У меня есть основной метод тестирования, который останавливается при первом подключении к серверу:

package core.september

import java.net.InetSocketAddress

import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, Props}
import akka.util.ByteString
import com.typesafe.config.ConfigFactory
import core.september.fastchain.network.Client

/**
 * @author ${user.name}
 */
object App {

  class ClientHandler extends Actor with ActorLogging {
    def receive = {
      case toLog ⇒ {
        log.debug("Client received "+ toLog.toString)
      }
        //Thread.sleep(200)
    }
  }


  def main(args : Array[String]) {
val config = ConfigFactory.parseString("akka.loglevel = DEBUG")
    implicit val system = ActorSystem("EchoServer", config)

var clientHand:ActorRef  = system.actorOf(Props(classOf[ClientHandler]))
var address:InetSocketAddress = new InetSocketAddress("localhost",5080)


var ackServer = system.actorOf(Props(classOf[EchoManager], classOf[SimpleEchoHandler],5080), "simple")
var client:ActorRef = system.actorOf(Props(classOf[Client],address,clientHand));

//Thread.sleep(200)

client ! ByteString("echo")

//Thread.sleep(200)

client ! "close"


  }

}

Если я не закомментирую два Thread.sleep после каждого сообщения, я не могу увидеть вывод отправленного сообщения, вывод без сна просто:

[DEBUG] [02/07/2019 15:47:21.812] [EchoServer-akka.actor.default-dispatcher-4] [akka://EchoServer/system/IO-TCP/selectors/$a/0] Attempting connection to [localhost/127.0.0.1:5080]
[DEBUG] [02/07/2019 15:47:21.816] [EchoServer-akka.actor.default-dispatcher-4] [akka://EchoServer/system/IO-TCP/selectors/$a/0] Connection established to [localhost/127.0.0.1:5080]
[DEBUG] [02/07/2019 15:47:21.825] [EchoServer-akka.actor.default-dispatcher-3] [akka://EchoServer/user/$a] Client received Connected(localhost/127.0.0.1:5080,/127.0.0.1:54616)

Я полностью потерял сообщение ByteString и сообщение "close". Мой вопрос заключается в том, почему мне нужно перевести основной поток в режим сна, чтобы показать также другие сообщения. С помощью сообщения thread.sleep правильно регистрируются сообщения:

[DEBUG] [02/07/2019 15:53:55.988] [EchoServer-akka.actor.default-dispatcher-5] [akka://EchoServer/system/IO-TCP/selectors/$a/0] Attempting connection to [localhost/127.0.0.1:5080]
[DEBUG] [02/07/2019 15:53:55.999] [EchoServer-akka.actor.default-dispatcher-5] [akka://EchoServer/system/IO-TCP/selectors/$a/0] Connection established to [localhost/127.0.0.1:5080]
[DEBUG] [02/07/2019 15:53:56.011] [EchoServer-akka.actor.default-dispatcher-5] [akka://EchoServer/user/$a] Client received Connected(localhost/127.0.0.1:5080,/127.0.0.1:54712)
[DEBUG] [02/07/2019 15:53:56.157] [EchoServer-akka.actor.default-dispatcher-2] [akka://EchoServer/user/$a] Client received ByteString(101, 99, 104, 111)
[DEBUG] [02/07/2019 15:53:56.374] [EchoServer-akka.actor.default-dispatcher-4] [akka://EchoServer/user/$a] Client received connection closed

Реализация ClientActor:

package core.september.fastchain.network

import akka.actor.{ Actor, ActorRef, Props }
import akka.io.{ IO, Tcp }
import akka.util.ByteString
import java.net.InetSocketAddress

object Client {
  def props(remote: InetSocketAddress, replies: ActorRef) =
    Props(classOf[Client], remote, replies)
}

class Client(remote: InetSocketAddress, listener: ActorRef) extends Actor {

  import Tcp._
  import context.system

  import akka.io.Tcp

  /*if (listener == null) {
   listener = Tcp.get(context.system).manager
  }*/

  IO(Tcp) ! Connect(remote)

  def  receive =  {
case CommandFailed(_: Connect) ⇒
  listener ! "connect failed"
      context stop self

case c @ Connected(remote, local) ⇒
  listener ! c
  val connection = sender()
  connection ! Register(self)
  context become {
    case data: ByteString ⇒
      connection ! Write(data)
    case CommandFailed(w: Write) ⇒
      // O/S buffer was full
      listener ! "write failed"
    case Received(data) ⇒
      listener ! data
    case "close" ⇒
      connection ! Close
    case _: ConnectionClosed ⇒
      listener ! "connection closed"
      context stop self
  }
  }
}

большое спасибо.

1 Ответ

0 голосов
/ 07 февраля 2019

Вам нужно подождать, пока актер обработает ваши сообщения, прежде чем выходить из приложения.Самый простой способ - использовать шаблон gracefulStop Акки:

import akka.pattern.gracefulStop
client ! ByteString("echo")
client ! "close"
Await.result(gracefulStop(client, 1 second)(system)
...