Я тестирую интеграцию моего клиента Grp c с сервером Grp c. В этом случае я тестирую отказоустойчивость клиента, когда сервер прекращает работу и после него выполняется резервное копирование. Проблема, с которой я сталкиваюсь, заключается в том, что akka.http.scaladsl.ServerBinding#terminate
на самом деле не завершает работу сервера после завершения Future[HttpTerminated]
Для успешного завершения работы сервера Grp c мне пришлось завершить систему акторов, используемую для создания экземпляра сервер Grp c. После завершения работы сервера Grp c ActorSystem
мне пришлось использовать Thread.sleep()
, чтобы дождаться, пока новый сервер сможет обслуживать запросы. Все это кажется немного странным, поскольку я ожидал, что сервер завершится и запустится, как только фьючерсы будут завершены.
Вот код, чтобы обмануть эти сомнения:
Файл ScalaTest:
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, Materializer}
import akka.stream.scaladsl.{Sink, Source}
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AsyncWordSpec
import testing.{GreeterService, HelloReply, HelloRequest}
import scala.concurrent.{ExecutionContext, Future}
import akka.http.scaladsl.model.HttpRequest
import akka.http.scaladsl.model.HttpResponse
import testing.GreeterServiceHandler
import akka.http.scaladsl.Http
import akka.http.scaladsl.Http.ServerBinding
import java.net.ServerSocket
import akka.grpc.GrpcClientSettings
import testing.GreeterServiceClient
import scala.concurrent.duration._
import io.grpc.StatusRuntimeException
class TestGRPCServer extends AsyncWordSpec with Matchers {
object RandomPort {
def apply(): Int = {
val socket = new ServerSocket(0)
socket.setReuseAddress(true)
val port = socket.getLocalPort
socket.close()
port
}
}
def greeterService(implicit mat: Materializer) = new GreeterService {
override def sayHello(in: HelloRequest): Future[HelloReply] = {
println(s"sayHello to ${in.name}")
Future.successful(HelloReply(s"Hello, ${in.name}"))
}
override def itKeepsTalking(in: Source[HelloRequest, NotUsed]): Future[HelloReply] = {
println(s"sayHello to in stream...")
in.runWith(Sink.seq).map(elements => HelloReply(s"Hello, ${elements.map(_.name).mkString(", ")}"))
}
override def itKeepsReplying(in: HelloRequest): Source[HelloReply, NotUsed] = {
println(s"sayHello to ${in.name} with stream of chars...")
Source(s"Hello, ${in.name}".toList).map(character => HelloReply(character.toString))
}
override def streamHellos(in: Source[HelloRequest, NotUsed]): Source[HelloReply, NotUsed] = {
println(s"sayHello to stream...")
in.map(request => HelloReply(s"Hello, ${request.name}"))
}
}
private def newGreeterService(maybePort: Option[Int] = None): Future[(ServerBinding, Int, ActorSystem)] = {
implicit val as: ActorSystem = ActorSystem("ServerSystem")
implicit val mat: Materializer = ActorMaterializer()
val service: HttpRequest => Future[HttpResponse] =
GreeterServiceHandler(greeterService)
val port = maybePort.getOrElse(RandomPort())
val binding = Http().bindAndHandleAsync(
service,
interface = "127.0.0.1",
port = port)
binding.foreach { binding => println(s"gRPC server bound to: ${binding.localAddress}") }
binding.map((_, port, as))
}
private def newClientService(port: Int): GreeterServiceClient = {
implicit val as: ActorSystem = ActorSystem("ClientSystem")
implicit val mat: Materializer = ActorMaterializer()
val clientSettings = GrpcClientSettings.connectToServiceAt("localhost", port).withTls(false)
GreeterServiceClient(clientSettings)
}
"GRPCServer" should {
"test grpc server" in {
val helloRequest = HelloRequest("test1")
val helloRequest2 = HelloRequest("test2")
(for {
(serverBinding, port, as1) <- newGreeterService()
client = newClientService(port)
resp <- client.sayHello(helloRequest)
_ <- as1.terminate()
_ <- recoverToSucceededIf[StatusRuntimeException](client.sayHello(helloRequest))
_ = println("foo")
(serverBinding2, _, as2) <- newGreeterService(Some(port))
_ = println("bar")
_ = Thread.sleep(3000)
_ = println("baz")
resp2 <- client.sayHello(helloRequest2)
_ = println("qux")
} yield (resp2.message)).map {message =>
message shouldBe "[Hello, test2]"
}
}
}
}
Файл Protobuf:
syntax = "proto3";
package testing;
service GreeterService {
//////////////////////
// Sends a greeting //
////////*****/////////
// HELLO //
////////*****/////////
rpc SayHello (HelloRequest) returns (HelloReply) {}
// Comment spanning
// on several lines
rpc ItKeepsTalking (stream HelloRequest) returns (HelloReply) {}
/*
* C style comments
*/
rpc ItKeepsReplying (HelloRequest) returns (stream HelloReply) {}
/* C style comments
* on several lines
* with non-empty heading/trailing line */
rpc StreamHellos (stream HelloRequest) returns (stream HelloReply) {}
}
message HelloRequest {
string name = 1;
}
// The response message containing the greetings
message HelloReply {
string message = 1;
}
файл build.sbt:
libraryDependencies += "org.scalatest" %% "scalatest" % "3.2.0" % Test
libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.6.6"
libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.6.6"
libraryDependencies += "com.typesafe.akka" %% "akka-discovery" % "2.6.6"
enablePlugins(AkkaGrpcPlugin)