Поток соединителя Alpakka S3 не будет обрабатывать нагрузку, создавая akka.stream.BufferOverflowException - PullRequest
0 голосов
/ 25 августа 2018

У меня есть сервис akka-http, и я пробую разъем alpakka s3 для загрузки файлов. Ранее я использовал временный файл, а затем загружал его с помощью Amazon SDK. Этот подход потребовал внесения некоторых изменений в Amazon SDK, чтобы сделать его более похожим на scala, но он мог обрабатывать даже 1000 запросов одновременно. Пропускная способность не была удивительной, но все запросы в конечном итоге прошли. Вот код до изменений, без альпакки:

`` `

path("uploadfile") {
    withRequestTimeout(20.seconds) {
        storeUploadedFile("csv", tempDestination) {
            case (metadata, file) =>
                val uploadFuture = upload(file, file.toPath.getFileName.toString)

                onComplete(uploadFuture) {
                    case Success(_) => complete(StatusCodes.OK)
                    case Failure(_) => complete(StatusCodes.FailedDependency)
                }
            }
        }
    }
}

case class S3UploaderException(msg: String) extends Exception(msg)

def upload(file: File, key: String): Future[String] = {
    val s3Client = AmazonS3ClientBuilder.standard()
        .withCredentials(new DefaultAWSCredentialsProviderChain())
        .withRegion(Regions.EU_WEST_3)
        .build()

    val promise = Promise[String]()

    val listener = new ProgressListener() {
        override def progressChanged(progressEvent: ProgressEvent): Unit = {
            (progressEvent.getEventType: @unchecked) match {
                case ProgressEventType.TRANSFER_FAILED_EVENT => promise.failure(S3UploaderException(s"Uploading a file with a key: $key"))
                case ProgressEventType.TRANSFER_COMPLETED_EVENT |
                     ProgressEventType.TRANSFER_CANCELED_EVENT => promise.success(key)
            }
        }
    }

    val request = new PutObjectRequest("S3_BUCKET", key, file)
    request.setGeneralProgressListener(listener)

    s3Client.putObject(request)

    promise.future
}

`` `

Когда я изменил это, чтобы использовать соединитель alpakka, код выглядит намного лучше, поскольку мы можем просто соединить ByteSource и alpakka Sink вместе. Однако этот подход не может справиться с такой большой нагрузкой. Когда я выполняю 1000 запросов одновременно (файлы 10 КБ), проходит менее 10%, а остальное не выполняется, за исключением:

akka.stream.alpakka.s3.impl.FailedUpload: Превышено настроено значение max-open-запросы [32]. Это означает, что очередь запросов этот бассейн (HostConnectionPoolSetup (bargain-test.s3-eu-west-3.amazonaws.com, 443, ConnectionPoolSetup (ConnectionPoolSettings (4,0,5,32,1,30 секунд, ClientConnectionSettings (Some (User-Agent: akka-http / 10.1.3), 10 секунд, 1 минута, 512, None, WebSocketSettings (, пинг, Duration.Inf, akka.http.impl.settings.WebSocketSettingsImpl $$$ Lambda $ 4787/1279590204 @ 4d809f4c), Список (), ParserSettings (2048,16,64,64,8192 , 64,8388608,256,1048576, Строгий, RFC6265, правда, Set (), Full, Error, Карта (If-Range -> 0, If-Modified-Since -> 0, If-Unmodified-Since -> 0, по умолчанию -> 12, Content-MD5 -> 0, Дата -> 0, If-Match -> 0, If-None- Матч -> 0, Пользователь-Агент -> 32), ложь, правда, akka.util.ConstantFun $$$ Lambda $ 4534/1539966798 @ 69c23cd4, akka.util.ConstantFun $$$ Lambda $ 4534/1539966798 @ 69c23cd4, akka.util.ConstantFun $$$ Lambda $ 4535/297570074 @ 6b426c59), None, TCPTransport), Новый, 1 второй), akka.http.scaladsl.HttpsConnectionContext @ 7e0f3726, akka.event.MarkerLoggingAdapter @ 74f3a78b))) полностью заполнен, потому что пул в настоящее время не обрабатывается запросы достаточно быстро, чтобы обработать входящий запрос загрузки. Пожалуйста, повторите запрос позже. Увидеть http://doc.akka.io/docs/akka-http/current/scala/http/client-side/pool-overflow.html для получения дополнительной информации.

Вот как выглядит сводка теста Гатлинга:

---- Распределение времени отклика ---------------------------------------- t <800 мс 0 (0%) </p>

800 мс

t> 1200 мс 90 (9%)

не удалось 910 (91%)


Когда я выполняю 100 одновременных запросов, половина из них терпит неудачу. Итак, все еще близко к удовлетворению.

Это новый код: `` `

path("uploadfile") {
    withRequestTimeout(20.seconds) {
        extractRequestContext { ctx =>
            implicit val materializer = ctx.materializer

            extractActorSystem { actorSystem =>

                fileUpload("csv") {

                    case (metadata, byteSource) =>

                        val uploadFuture = byteSource.runWith(S3Uploader.sink("s3FileKey")(actorSystem, materializer))

                        onComplete(uploadFuture) {
                            case Success(_) => complete(StatusCodes.OK)
                            case Failure(_) => complete(StatusCodes.FailedDependency)
                        }
                }            
            }
        }
    }
}

def sink(s3Key: String)(implicit as: ActorSystem, m: Materializer) = {
    val regionProvider = new AwsRegionProvider {
        def getRegion: String = Regions.EU_WEST_3.getName
    }

    val settings = new S3Settings(MemoryBufferType, None, new DefaultAWSCredentialsProviderChain(), regionProvider, false, None, ListBucketVersion2)
    val s3Client = new S3Client(settings)(as, m)

    s3Client.multipartUpload("S3_BUCKET", s3Key)
}

`` `

Полный код с обеих конечных точек можно увидеть здесь

У меня есть пара вопросов.

1) Это функция? Это то, что мы можем назвать противодавлением?

2) Если я хотел бы, чтобы этот код вел себя как старый подход с временным файлом (нет неудавшихся запросов, и все они в какой-то момент заканчиваются), что мне делать? Я пытался реализовать очередь для потока (ссылка на источник ниже), но это не имело никакого значения. Код можно увидеть здесь .

(* ОТКАЗ ОТ ОТВЕТСТВЕННОСТИ * Я все еще новичок в Scala, пытающийся быстро понять потоки akka и найти какой-то обходной путь для этой проблемы. Есть большие шансы, что в этом коде есть что-то простое. * ОТКАЗ ОТ ОТВЕТСТВЕННОСТИ *)

1 Ответ

0 голосов
/ 27 августа 2018

Это функция противодавления.

Exceeded configured max-open-requests value of [32] В конфигурации max-open-requests по умолчанию установлено на 32.Потоковая передача используется для работы с большим количеством данных, а не для обработки большого количества запросов в секунду.

Разработчики Akka должны были что-то добавить для max-open-requests.Они выбирают 32 почему-то точно.И они понятия не имели, для чего он будет использоваться.Может быть, он отправляет 1000 файлов по 32 КБ или 1000 файлов по 1 ГБ одновременно?Они не знаютНо они все еще хотят убедиться, что по умолчанию (и 80% людей используют значения по умолчанию) приложения будут обрабатываться изящно и безопасно.Поэтому им пришлось ограничить вычислительную мощность.

Вы попросили сделать 1000 «сейчас», но я вполне уверен, что AWS не отправлял 1000 файлов одновременно, а использовал некоторую очередь, что также может быть полезно для вас, если выЕсть много небольших файлов для загрузки.

Но это прекрасно, чтобы настроить его для вашего случая!Если вы знаете, что ваша машина и целевое устройство позаботится о большем количестве одновременных подключений, вы можете изменить значение на более высокое значение.

Кроме того, для многих HTTP-вызовов используйте пул кэшированных подключений хоста .

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...