Связь через параллельный поток Grpc приводит к ошибке: AkkaNettyGrpcClientGraphStage - PullRequest
1 голос
/ 28 мая 2019

У меня есть две службы: одна отправляет потоковые данные, а вторая получает их, используя akka-grpc для связи.Когда исходные данные предоставляются, Сервис 1 вызывается для обработки и отправки его Сервису 2 через клиент grpc.Вполне возможно, что несколько экземпляров одного сервера будут запущены одновременно, если одновременно предоставлено несколько исходных данных. В ходе длительного тестирования моего приложения.Я вижу ниже ошибку в сервисе один:

ERROR i.a.g.application.actors.DbActor - GraphStage [akka.grpc.internal.AkkaNettyGrpcClientGraphStage$$anon$1@59d40805] terminated abruptly, caused by for example materializer or act
  akka.stream.AbruptStageTerminationException: GraphStage [akka.grpc.internal.AkkaNettyGrpcClientGraphStage$$anon$1@59d40805] terminated abruptly, caused by for example materializer or actor system termination.

Я никогда не выключал системы актеров, а только убивал актеров после выполнения их работы.Также я использовал proto3 и http2 для привязки запроса.Вот фрагмент моего кода в сервисе один:

////////////////////server http binding /////////
 val service: HttpRequest => Future[HttpResponse] =
  ServiceOneServiceHandler(new ServiceOneServiceImpl(system))

val bound = Http().bindAndHandleAsync(
  service,
  interface = config.getString("akka.grpc.server.interface"),
  port = config.getString("akka.grpc.server.default-http-port").toInt,
  connectionContext = HttpConnectionContext(http2 = Always))

bound.foreach { binding =>
  logger.info(s"gRPC server bound to: ${binding.localAddress}")
}

////////////////////client /////////
def send2Server[A](data: ListBuffer[A]): Future[ResponseDTO] = {
val reply = {

      val thisClient = interface.initialize()
      interface.call(client = thisClient, req = data.asInstanceOf[ListBuffer[StoreRequest]].toList)

  }
  reply
}

///////////////// grpc communication //////////
def send2GrpcServer[A](data: ListBuffer[A]): Unit = {
val reply = send2Server(data)
Await.ready(reply, Duration.Inf) onComplete {
  case util.Success(response: ResponseDTO) =>
    logger.info(s"got reply message: ${res.description}")

    //////check response content and stop application if desired result not found in response
    }
  case util.Failure(exp) =>
    //////stop application
    throw exp.getCause
}

}

Ошибка произошла точно после ожидания ответа сервиса 2:

Await.ready(reply, Duration.Inf)

Я не могупоймать причину ошибки.

ОБНОВЛЕНИЕ

Я обнаружил, что пропущен какой-то поток, так что служба 1 отправляет потоку неопределенное время ожидания ответа, а служба 2 - нет.получить что-нибудь, чтобы ответить на службу один, но все еще не знаю, почему пропущен поток, я также обновил плагин akka grpc, но не имеет смысла:

addSbtPlugin("com.lightbend.akka.grpc" % "sbt-akka-grpc" % "0.6.1")

addSbtPlugin("com.lightbend.sbt" % "sbt-javaagent" % "0.1.4") 
...