akka.http.scaladsl.model.ParsingException: неожиданный конец составной сущности при загрузке большого файла на S3 с использованием akka http - PullRequest
1 голос
/ 19 июня 2020

Я пытаюсь загрузить большой файл (на данный момент 90 МБ) на S3, используя Akka HTTP с коннектором Alpakka S3. Он отлично работает для небольших файлов (25 МБ), но когда я пытаюсь загрузить большой файл (90 МБ), я получаю следующую ошибку:

akka.http.scaladsl.model.ParsingException: Unexpected end of multipart entity
at akka.http.scaladsl.unmarshalling.MultipartUnmarshallers$$anonfun$1.applyOrElse(MultipartUnmarshallers.scala:108)
at akka.http.scaladsl.unmarshalling.MultipartUnmarshallers$$anonfun$1.applyOrElse(MultipartUnmarshallers.scala:103)
at akka.stream.impl.fusing.Collect$$anon$6.$anonfun$wrappedPf$1(Ops.scala:227)
at akka.stream.impl.fusing.SupervisedGraphStageLogic.withSupervision(Ops.scala:186)
at akka.stream.impl.fusing.Collect$$anon$6.onPush(Ops.scala:229)
at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:523)
at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:510)
at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:376)
at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:606)
at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:485)
at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:581)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:749)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$shortCircuitBatch(ActorGraphInterpreter.scala:739)
at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:765)
at akka.actor.Actor.aroundReceive(Actor.scala:539)
at akka.actor.Actor.aroundReceive$(Actor.scala:537)
at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:671)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:614)
at akka.actor.ActorCell.invoke(ActorCell.scala:583)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268)
at akka.dispatch.Mailbox.run(Mailbox.scala:229)
at akka.dispatch.Mailbox.exec(Mailbox.scala:241)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Хотя в конце я получаю сообщение об успехе, но файл загружен не полностью. Он загружается только 45-50 МБ.

Я использую следующий код: S3Utility. scala

    class S3Utility(implicit as: ActorSystem, m: Materializer) {
  private val bucketName = "test"

  def sink(fileInfo: FileInfo): Sink[ByteString, Future[MultipartUploadResult]] = {
    val fileName = fileInfo.fileName
    S3.multipartUpload(bucketName, fileName)
  }
}

Маршруты:

def uploadLargeFile: Route =
  post {
    path("import" / "file") {
      extractMaterializer { implicit materializer =>
        withoutSizeLimit {
          fileUpload("file") {
            case (metadata, byteSource) =>
              logger.info(s"Request received to import large file: ${metadata.fileName}")
              val uploadFuture = byteSource.runWith(s3Utility.sink(metadata))
              onComplete(uploadFuture) {
                case Success(result) =>
                  logger.info(s"Successfully uploaded file")
                  complete(StatusCodes.OK)
                case Failure(ex) =>
                  println(ex, "Error in uploading file")
                  complete(StatusCodes.FailedDependency, ex.getMessage)
              }
          }
        }
      }
    }
  }

Любая помощь будут оценены. Спасибо

1 Ответ

1 голос
/ 20 июня 2020

Стратегия 1

Можете ли вы разбить файл на более мелкие части и повторить попытку, вот пример кода:

AmazonS3 s3Client = AmazonS3ClientBuilder.standard()
            .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("some-kind-of-endpoint"))
            .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("user", "pass")))
            .disableChunkedEncoding()
            .withPathStyleAccessEnabled(true)
            .build();

    // Create a list of UploadPartResponse objects. You get one of these
    // for each part upload.
    List<PartETag> partETags = new ArrayList<PartETag>();

    // Step 1: Initialize.
    InitiateMultipartUploadRequest initRequest = new
            InitiateMultipartUploadRequest("bucket", "key");
    InitiateMultipartUploadResult initResponse =
            s3Client.initiateMultipartUpload(initRequest);

    File file = new File("filepath");
    long contentLength = file.length();
    long partSize = 5242880; // Set part size to 5 MB.

    try {
        // Step 2: Upload parts.
        long filePosition = 0;
        for (int i = 1; filePosition < contentLength; i++) {
            // Last part can be less than 5 MB. Adjust part size.
            partSize = Math.min(partSize, (contentLength - filePosition));

            // Create a request to upload a part.
            UploadPartRequest uploadRequest = new UploadPartRequest()
                    .withBucketName("bucket").withKey("key")
                    .withUploadId(initResponse.getUploadId()).withPartNumber(i)
                    .withFileOffset(filePosition)
                    .withFile(file)
                    .withPartSize(partSize);

            // Upload part and add response to our list.
            partETags.add(
                    s3Client.uploadPart(uploadRequest).getPartETag());

            filePosition += partSize;
        }

        // Step 3: Complete.
        CompleteMultipartUploadRequest compRequest = new
                CompleteMultipartUploadRequest(
                "bucket",
                "key",
                initResponse.getUploadId(),
                partETags);

        s3Client.completeMultipartUpload(compRequest);
    } catch (Exception e) {
        s3Client.abortMultipartUpload(new AbortMultipartUploadRequest(
                "bucket", "key", initResponse.getUploadId()));
    }

Стратегия 2

Увеличьте idle-timeout HTTP-сервера Akka (просто установите его на infinite), например:

akka.http.server.idle-timeout=infinite

Это увеличит период времени, в течение которого сервер ожидает простоя. По умолчанию его значение составляет 60 секунд. И если сервер не сможет загрузить файл в течение этого периода времени, он закроет соединение и выдаст ошибку «Неожиданный конец составного объекта».

...