У меня есть две службы: одна отправляет потоковые данные, а вторая получает их, используя akka-grpc для связи.Когда исходные данные предоставляются, Сервис 1 вызывается для обработки и отправки его Сервису 2 через клиент grpc.Вполне возможно, что несколько экземпляров одного сервера будут запущены одновременно, если одновременно предоставлено несколько исходных данных. В ходе длительного тестирования моего приложения.Я вижу ниже ошибку в сервисе один:
ERROR i.a.g.application.actors.DbActor - GraphStage [akka.grpc.internal.AkkaNettyGrpcClientGraphStage$$anon$1@59d40805] terminated abruptly, caused by for example materializer or act
akka.stream.AbruptStageTerminationException: GraphStage [akka.grpc.internal.AkkaNettyGrpcClientGraphStage$$anon$1@59d40805] terminated abruptly, caused by for example materializer or actor system termination.
Я никогда не выключал системы актеров, а только убивал актеров после выполнения их работы.Также я использовал proto3
и http2
для привязки запроса.Вот фрагмент моего кода в сервисе один:
////////////////////server http binding /////////
val service: HttpRequest => Future[HttpResponse] =
ServiceOneServiceHandler(new ServiceOneServiceImpl(system))
val bound = Http().bindAndHandleAsync(
service,
interface = config.getString("akka.grpc.server.interface"),
port = config.getString("akka.grpc.server.default-http-port").toInt,
connectionContext = HttpConnectionContext(http2 = Always))
bound.foreach { binding =>
logger.info(s"gRPC server bound to: ${binding.localAddress}")
}
////////////////////client /////////
def send2Server[A](data: ListBuffer[A]): Future[ResponseDTO] = {
val reply = {
val thisClient = interface.initialize()
interface.call(client = thisClient, req = data.asInstanceOf[ListBuffer[StoreRequest]].toList)
}
reply
}
///////////////// grpc communication //////////
def send2GrpcServer[A](data: ListBuffer[A]): Unit = {
val reply = send2Server(data)
Await.ready(reply, Duration.Inf) onComplete {
case util.Success(response: ResponseDTO) =>
logger.info(s"got reply message: ${res.description}")
//////check response content and stop application if desired result not found in response
}
case util.Failure(exp) =>
//////stop application
throw exp.getCause
}
}
Ошибка произошла точно после ожидания ответа сервиса 2:
Await.ready(reply, Duration.Inf)
Я не могупоймать причину ошибки.
ОБНОВЛЕНИЕ
Я обнаружил, что пропущен какой-то поток, так что служба 1 отправляет потоку неопределенное время ожидания ответа, а служба 2 - нет.получить что-нибудь, чтобы ответить на службу один, но все еще не знаю, почему пропущен поток, я также обновил плагин akka grpc, но не имеет смысла:
addSbtPlugin("com.lightbend.akka.grpc" % "sbt-akka-grpc" % "0.6.1")
addSbtPlugin("com.lightbend.sbt" % "sbt-javaagent" % "0.1.4")