У меня есть сервис akka-http, и я пробую разъем alpakka s3 для загрузки файлов. Ранее я использовал временный файл, а затем загружал его с помощью Amazon SDK. Этот подход потребовал внесения некоторых изменений в Amazon SDK, чтобы сделать его более похожим на scala, но он мог обрабатывать даже 1000 запросов одновременно. Пропускная способность не была удивительной, но все запросы в конечном итоге прошли. Вот код до изменений, без альпакки:
`` `
path("uploadfile") {
withRequestTimeout(20.seconds) {
storeUploadedFile("csv", tempDestination) {
case (metadata, file) =>
val uploadFuture = upload(file, file.toPath.getFileName.toString)
onComplete(uploadFuture) {
case Success(_) => complete(StatusCodes.OK)
case Failure(_) => complete(StatusCodes.FailedDependency)
}
}
}
}
}
case class S3UploaderException(msg: String) extends Exception(msg)
def upload(file: File, key: String): Future[String] = {
val s3Client = AmazonS3ClientBuilder.standard()
.withCredentials(new DefaultAWSCredentialsProviderChain())
.withRegion(Regions.EU_WEST_3)
.build()
val promise = Promise[String]()
val listener = new ProgressListener() {
override def progressChanged(progressEvent: ProgressEvent): Unit = {
(progressEvent.getEventType: @unchecked) match {
case ProgressEventType.TRANSFER_FAILED_EVENT => promise.failure(S3UploaderException(s"Uploading a file with a key: $key"))
case ProgressEventType.TRANSFER_COMPLETED_EVENT |
ProgressEventType.TRANSFER_CANCELED_EVENT => promise.success(key)
}
}
}
val request = new PutObjectRequest("S3_BUCKET", key, file)
request.setGeneralProgressListener(listener)
s3Client.putObject(request)
promise.future
}
`` `
Когда я изменил это, чтобы использовать соединитель alpakka, код выглядит намного лучше, поскольку мы можем просто соединить ByteSource
и alpakka Sink
вместе. Однако этот подход не может справиться с такой большой нагрузкой. Когда я выполняю 1000 запросов одновременно (файлы 10 КБ), проходит менее 10%, а остальное не выполняется, за исключением:
akka.stream.alpakka.s3.impl.FailedUpload: Превышено настроено
значение max-open-запросы [32]. Это означает, что очередь запросов
этот бассейн
(HostConnectionPoolSetup (bargain-test.s3-eu-west-3.amazonaws.com, 443, ConnectionPoolSetup (ConnectionPoolSettings (4,0,5,32,1,30
секунд, ClientConnectionSettings (Some (User-Agent: akka-http / 10.1.3), 10
секунд, 1
минута, 512, None, WebSocketSettings (, пинг, Duration.Inf, akka.http.impl.settings.WebSocketSettingsImpl $$$ Lambda $ 4787/1279590204 @ 4d809f4c), Список (), ParserSettings (2048,16,64,64,8192 , 64,8388608,256,1048576, Строгий, RFC6265, правда, Set (), Full, Error, Карта (If-Range
-> 0, If-Modified-Since -> 0, If-Unmodified-Since -> 0, по умолчанию -> 12, Content-MD5 -> 0, Дата -> 0, If-Match -> 0, If-None- Матч -> 0,
Пользователь-Агент ->
32), ложь, правда, akka.util.ConstantFun $$$ Lambda $ 4534/1539966798 @ 69c23cd4, akka.util.ConstantFun $$$ Lambda $ 4534/1539966798 @ 69c23cd4, akka.util.ConstantFun $$$ Lambda $ 4535/297570074 @ 6b426c59), None, TCPTransport), Новый, 1
второй), akka.http.scaladsl.HttpsConnectionContext @ 7e0f3726, akka.event.MarkerLoggingAdapter @ 74f3a78b)))
полностью заполнен, потому что пул в настоящее время не обрабатывается
запросы достаточно быстро, чтобы обработать входящий запрос загрузки. Пожалуйста, повторите
запрос позже. Увидеть
http://doc.akka.io/docs/akka-http/current/scala/http/client-side/pool-overflow.html
для получения дополнительной информации.
Вот как выглядит сводка теста Гатлинга:
---- Распределение времени отклика ----------------------------------------
t <800 мс 0 (0%) </p>
800 мс
t> 1200 мс 90 (9%)
не удалось 910 (91%)
Когда я выполняю 100 одновременных запросов, половина из них терпит неудачу. Итак, все еще близко к удовлетворению.
Это новый код:
`` `
path("uploadfile") {
withRequestTimeout(20.seconds) {
extractRequestContext { ctx =>
implicit val materializer = ctx.materializer
extractActorSystem { actorSystem =>
fileUpload("csv") {
case (metadata, byteSource) =>
val uploadFuture = byteSource.runWith(S3Uploader.sink("s3FileKey")(actorSystem, materializer))
onComplete(uploadFuture) {
case Success(_) => complete(StatusCodes.OK)
case Failure(_) => complete(StatusCodes.FailedDependency)
}
}
}
}
}
}
def sink(s3Key: String)(implicit as: ActorSystem, m: Materializer) = {
val regionProvider = new AwsRegionProvider {
def getRegion: String = Regions.EU_WEST_3.getName
}
val settings = new S3Settings(MemoryBufferType, None, new DefaultAWSCredentialsProviderChain(), regionProvider, false, None, ListBucketVersion2)
val s3Client = new S3Client(settings)(as, m)
s3Client.multipartUpload("S3_BUCKET", s3Key)
}
`` `
Полный код с обеих конечных точек можно увидеть здесь
У меня есть пара вопросов.
1) Это функция? Это то, что мы можем назвать противодавлением?
2) Если я хотел бы, чтобы этот код вел себя как старый подход с временным файлом (нет неудавшихся запросов, и все они в какой-то момент заканчиваются), что мне делать? Я пытался реализовать очередь для потока (ссылка на источник ниже), но это не имело никакого значения. Код можно увидеть здесь .
(* ОТКАЗ ОТ ОТВЕТСТВЕННОСТИ * Я все еще новичок в Scala, пытающийся быстро понять потоки akka и найти какой-то обходной путь для этой проблемы. Есть большие шансы, что в этом коде есть что-то простое. * ОТКАЗ ОТ ОТВЕТСТВЕННОСТИ *)