У меня есть веб-API, отображающий одну конечную точку GET с использованием Akka HTTP, и логика, в которой он принимает параметр от запрашивающей стороны, вызывает и вызывает внешний веб-сервис с помощью AKKA Streams и на основании полученного ответа отправляет запрос другой конечной точке, также используя akkaпоток.
первый вызов внешней конечной точки выглядит следующим образом
def poolFlow(uri: String): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] =
Http().cachedHostConnectionPool[T](host = uri, 80)
def parseResponse(parallelism: Int): Flow[(Try[HttpResponse], T), (ByteString, T), NotUsed] =
Flow[(Try[HttpResponse], T)].mapAsync(parallelism) {
case (Success(HttpResponse(_, _, entity, _)), t) =>
entity.dataBytes.alsoTo(Sink.ignore)
.runFold(ByteString.empty)(_ ++ _)
.map(e => e -> t)
case (Failure(ex), _) => throw ex
}
def parse(result: String, data: RequestShape): (Coord, Coord, String) =
(data.src, data.dst, result)
val parseEntity: Flow[(ByteString, RequestShape), (Coord, Coord, String), NotUsed] =
Flow[(ByteString, RequestShape)] map {
case (entity, request) => parse(entity.utf8String, request)
}
и потребителем потока
val routerResponse = httpRequests
.map(buildHttpRequest)
.via(RouterRequestProcessor.poolFlow(uri)).async
.via(RouterRequestProcessor.parseResponse(2))
.via(RouterRequestProcessor.parseEntity)
.alsoTo(Sink.ignore)
.runFold(Vector[(Coord, Coord, String)]()) {
(acc, res) => acc :+ res
}
routerResponse
затем я делаю некоторые вычисления на routerResponse и создаю сообщение для другого внешнеговеб-служба,
Второй внешний потребитель потока
def poolFlow(uri: String): Flow[(HttpRequest, Unit), (Try[HttpResponse], Unit), Http.HostConnectionPool] =
Http().cachedHostConnectionPoolHttps[Unit](host = uri)
val parseEntity: Flow[(ByteString, Unit), (Unit.type, String), NotUsed] = Flow[(ByteString, Unit)] map {
case (entity, _) => parse(entity.utf8String)
}
def parse(result: String): (Unit.type, String) = (Unit, result)
val res = Source.single(httpRequest)
.via(DataRobotRequestProcessor.poolFlow(uri))
.via(DataRobotRequestProcessor.parseResponse(1))
.via(DataRobotRequestProcessor.parseEntity)
.alsoTo(Sink.ignore)
.runFold(List[String]()) {
(acc, res) => acc :+ res._2
}
Конечная точка Get получает первый поток и затем создает второй запрос на основе первого ответа,
Примечания:
первый внешний сервис работает быстро в течение 1-2 секунд, а второй внешний сервис работает медленно в течение 3-4 секунд.
первыйдля запроса конечной точки используется parallelism=2
, а для запроса второй конечной точки - parallelism=1
Служба работает в кластере AWS ECS, а для целей тестирования - водин узел
проблема,
в том, что веб-служба работает какое-то время, но загрузка ЦП возрастает за счет обработки большего количества запросов, я бы предположил, что это связано сзадний прессобязательно запускается, и процессор остается сильно загруженным после того, как не отправляется запрос, что странно.
Кто-нибудь знает, что происходит