AKKA HTTP + AKKA stream 100% загрузка процессора - PullRequest
0 голосов
/ 26 мая 2018

У меня есть веб-API, отображающий одну конечную точку GET с использованием Akka HTTP, и логика, в которой он принимает параметр от запрашивающей стороны, вызывает и вызывает внешний веб-сервис с помощью AKKA Streams и на основании полученного ответа отправляет запрос другой конечной точке, также используя akkaпоток.

первый вызов внешней конечной точки выглядит следующим образом

def poolFlow(uri: String): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] =
    Http().cachedHostConnectionPool[T](host = uri, 80)

def parseResponse(parallelism: Int): Flow[(Try[HttpResponse], T), (ByteString, T), NotUsed] =

    Flow[(Try[HttpResponse], T)].mapAsync(parallelism) {
      case (Success(HttpResponse(_, _, entity, _)), t) =>
        entity.dataBytes.alsoTo(Sink.ignore)
          .runFold(ByteString.empty)(_ ++ _)
          .map(e => e -> t)

      case (Failure(ex), _) => throw ex
    }

def parse(result: String, data: RequestShape): (Coord, Coord, String) =
    (data.src, data.dst, result)

val parseEntity: Flow[(ByteString, RequestShape), (Coord, Coord, String), NotUsed] =
    Flow[(ByteString, RequestShape)] map {
      case (entity, request) => parse(entity.utf8String, request)
    }

и потребителем потока

val routerResponse = httpRequests
  .map(buildHttpRequest)
  .via(RouterRequestProcessor.poolFlow(uri)).async
  .via(RouterRequestProcessor.parseResponse(2))
  .via(RouterRequestProcessor.parseEntity)
  .alsoTo(Sink.ignore)
  .runFold(Vector[(Coord, Coord, String)]()) {
    (acc, res) => acc :+ res
  }

routerResponse

затем я делаю некоторые вычисления на routerResponse и создаю сообщение для другого внешнеговеб-служба,

Второй внешний потребитель потока

def poolFlow(uri: String): Flow[(HttpRequest, Unit), (Try[HttpResponse], Unit), Http.HostConnectionPool] =
    Http().cachedHostConnectionPoolHttps[Unit](host = uri)

val parseEntity: Flow[(ByteString, Unit), (Unit.type, String), NotUsed] = Flow[(ByteString, Unit)] map {
    case (entity, _) => parse(entity.utf8String)
  }

  def parse(result: String): (Unit.type, String) = (Unit, result)

val res = Source.single(httpRequest)
      .via(DataRobotRequestProcessor.poolFlow(uri))
      .via(DataRobotRequestProcessor.parseResponse(1))
      .via(DataRobotRequestProcessor.parseEntity)
      .alsoTo(Sink.ignore)
      .runFold(List[String]()) {
        (acc, res) => acc :+ res._2
      }

Конечная точка Get получает первый поток и затем создает второй запрос на основе первого ответа,

Примечания:

  1. первый внешний сервис работает быстро в течение 1-2 секунд, а второй внешний сервис работает медленно в течение 3-4 секунд.

  2. первыйдля запроса конечной точки используется parallelism=2, а для запроса второй конечной точки - parallelism=1

  3. Служба работает в кластере AWS ECS, а для целей тестирования - водин узел

проблема,

в том, что веб-служба работает какое-то время, но загрузка ЦП возрастает за счет обработки большего количества запросов, я бы предположил, что это связано сзадний прессобязательно запускается, и процессор остается сильно загруженным после того, как не отправляется запрос, что странно.

Кто-нибудь знает, что происходит

...