Сервер Akka Grp c на самом деле не завершает работу - PullRequest
0 голосов
/ 13 июля 2020

Я тестирую интеграцию моего клиента Grp c с сервером Grp c. В этом случае я тестирую отказоустойчивость клиента, когда сервер прекращает работу и после него выполняется резервное копирование. Проблема, с которой я сталкиваюсь, заключается в том, что akka.http.scaladsl.ServerBinding#terminate на самом деле не завершает работу сервера после завершения Future[HttpTerminated]

Для успешного завершения работы сервера Grp c мне пришлось завершить систему акторов, используемую для создания экземпляра сервер Grp c. После завершения работы сервера Grp c ActorSystem мне пришлось использовать Thread.sleep(), чтобы дождаться, пока новый сервер сможет обслуживать запросы. Все это кажется немного странным, поскольку я ожидал, что сервер завершится и запустится, как только фьючерсы будут завершены.

Вот код, чтобы обмануть эти сомнения:

Файл ScalaTest:

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, Materializer}
import akka.stream.scaladsl.{Sink, Source}
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AsyncWordSpec
import testing.{GreeterService, HelloReply, HelloRequest}

import scala.concurrent.{ExecutionContext, Future}
import akka.http.scaladsl.model.HttpRequest
import akka.http.scaladsl.model.HttpResponse
import testing.GreeterServiceHandler
import akka.http.scaladsl.Http
import akka.http.scaladsl.Http.ServerBinding
import java.net.ServerSocket
import akka.grpc.GrpcClientSettings
import testing.GreeterServiceClient
import scala.concurrent.duration._
import io.grpc.StatusRuntimeException


class TestGRPCServer extends AsyncWordSpec with Matchers {

  object RandomPort {
  def apply(): Int = {
    val socket = new ServerSocket(0)
    socket.setReuseAddress(true)
    val port = socket.getLocalPort
    socket.close()
    port
  }
}


  def greeterService(implicit mat: Materializer) = new GreeterService {

    override def sayHello(in: HelloRequest): Future[HelloReply] = {
      println(s"sayHello to ${in.name}")
      Future.successful(HelloReply(s"Hello, ${in.name}"))
    }

    override def itKeepsTalking(in: Source[HelloRequest, NotUsed]): Future[HelloReply] = {
      println(s"sayHello to in stream...")
      in.runWith(Sink.seq).map(elements => HelloReply(s"Hello, ${elements.map(_.name).mkString(", ")}"))
    }

    override def itKeepsReplying(in: HelloRequest): Source[HelloReply, NotUsed] = {
      println(s"sayHello to ${in.name} with stream of chars...")
      Source(s"Hello, ${in.name}".toList).map(character => HelloReply(character.toString))
    }

    override def streamHellos(in: Source[HelloRequest, NotUsed]): Source[HelloReply, NotUsed] = {
      println(s"sayHello to stream...")
      in.map(request => HelloReply(s"Hello, ${request.name}"))
    }
  }

  private def newGreeterService(maybePort: Option[Int] = None): Future[(ServerBinding, Int, ActorSystem)] = {
    implicit val as: ActorSystem = ActorSystem("ServerSystem")
    implicit val mat: Materializer = ActorMaterializer()
    val service: HttpRequest => Future[HttpResponse] =
      GreeterServiceHandler(greeterService)
    val port = maybePort.getOrElse(RandomPort())
    val binding = Http().bindAndHandleAsync(
      service,
      interface = "127.0.0.1",
      port = port)
    binding.foreach { binding => println(s"gRPC server bound to: ${binding.localAddress}") }

    binding.map((_, port, as))
  }

  private def newClientService(port: Int): GreeterServiceClient = {
    implicit val as: ActorSystem = ActorSystem("ClientSystem")
    implicit val mat: Materializer = ActorMaterializer()
    val clientSettings = GrpcClientSettings.connectToServiceAt("localhost", port).withTls(false)
    GreeterServiceClient(clientSettings)
  }

  "GRPCServer" should {
    "test grpc server" in {
      val helloRequest = HelloRequest("test1")
      val helloRequest2 = HelloRequest("test2")
      (for {
        (serverBinding, port, as1) <- newGreeterService()
        client                     =  newClientService(port)
        resp                       <- client.sayHello(helloRequest)
        _                          <- as1.terminate()
        _                          <- recoverToSucceededIf[StatusRuntimeException](client.sayHello(helloRequest))
        _ = println("foo")
        (serverBinding2, _, as2)   <- newGreeterService(Some(port))
        _ = println("bar")
        _ = Thread.sleep(3000)
        _ = println("baz")
        resp2                      <- client.sayHello(helloRequest2)
        _ = println("qux")
      } yield (resp2.message)).map {message =>
        message shouldBe "[Hello, test2]"
      }
    }
  }


}

Файл Protobuf:

syntax = "proto3";

package testing;

service GreeterService {
    //////////////////////
    // Sends a greeting //
    ////////*****/////////
    //      HELLO       //
    ////////*****/////////
    rpc SayHello (HelloRequest) returns (HelloReply) {}

    // Comment spanning
    // on several lines
    rpc ItKeepsTalking (stream HelloRequest) returns (HelloReply) {}

    /*
     * C style comments
     */
    rpc ItKeepsReplying (HelloRequest) returns (stream HelloReply) {}

    /* C style comments
     * on several lines
     * with non-empty heading/trailing line */
    rpc StreamHellos (stream HelloRequest) returns (stream HelloReply) {}
}

message HelloRequest {
    string name = 1;
}

// The response message containing the greetings
message HelloReply {
    string message = 1;
}

файл build.sbt:

libraryDependencies += "org.scalatest" %% "scalatest" % "3.2.0" % Test
libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.6.6"
libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.6.6"
libraryDependencies += "com.typesafe.akka" %% "akka-discovery" % "2.6.6"

enablePlugins(AkkaGrpcPlugin)

...