Многоэтапная загрузка s3 всегда завершается с ошибкой во второй части с тайм-аутом - PullRequest
0 голосов
/ 07 февраля 2019

Я пытаюсь получить простое подтверждение концепции многочастной загрузки, работающей в Kotlin, с помощью клиента amazon s3 на основе документации .Первая часть успешно загружена, и я получил ответ с etag.Вторая часть не загружает одну вещь и время ожидания.Это всегда терпит неудачу после первой части.Есть ли какая-то очистка соединения, которую мне нужно как-то сделать вручную?

Учетные данные и права все в порядке.Магические числа, приведенные ниже, предназначены только для того, чтобы получить минимальный размер части 5 МБ.

Что я здесь не так делаю?

fun main() {
    val amazonS3 =
        AmazonS3ClientBuilder.standard().withRegion(Regions.EU_WEST_1).withCredentials(ProfileCredentialsProvider())
            .build()


    val bucket = "io.inbot.sandbox"
    val key = "test.txt"
    val multipartUpload =
        amazonS3.initiateMultipartUpload(InitiateMultipartUploadRequest(bucket, key))

    var pn=1
    var off=0L
    val etags = mutableListOf<PartETag>()

    for( i in 0.rangeTo(5)) {

        val buf = ByteArrayOutputStream()
        val writer = buf.writer().buffered()
        for(l in 0.rangeTo(100000)) {
            writer.write("part $i - Hello world for the $l'th time this part.\n")
        }
        writer.flush()
        writer.close()

        val bytes = buf.toByteArray()


        val md = MessageDigest.getInstance("MD5")
        md.update(bytes)
        val md5 = Base64.encodeBytes(md.digest())
        println("going to write ${bytes.size}")
        bytes.inputStream()
        var partRequest = UploadPartRequest().withBucketName(bucket).withKey(key)
            .withUploadId(multipartUpload.uploadId)
            .withFileOffset(off)
            .withPartSize(bytes.size.toLong())
            .withPartNumber(pn++)
            .withMD5Digest(md5)
            .withInputStream(bytes.inputStream())
            .withGeneralProgressListener<UploadPartRequest> { it ->
                println(it.bytesTransferred)
            }
        if(i == 5) {
            partRequest = partRequest.withLastPart(true)
        }

        off+=bytes.size

        val partResponse = amazonS3.uploadPart(partRequest)

        etags.add(partResponse.partETag)
        println("part ${partResponse.partNumber} ${partResponse.eTag} ${bytes.size}")

    }
    val completeMultipartUpload =
        amazonS3.completeMultipartUpload(CompleteMultipartUploadRequest(bucket, key, multipartUpload.uploadId, etags))


}

Во второй части всегда происходит сбой с

Exception in thread "main" com.amazonaws.services.s3.model.AmazonS3Exception: Your socket connection to the server was not read from or written to within the timeout period. Idle connections will be closed. (Service: Amazon S3; Status Code: 400; Error Code: RequestTimeout; Request ID: F419872A24BB5526; S3 Extended Request ID: 48XWljQNuOH6LJG9Z85NJOGVy4iv/ru44Ai8hxEP+P+nqHECXZwWNwBoMyjiQfxKpr6icGFjxYc=), S3 Extended Request ID: 48XWljQNuOH6LJG9Z85NJOGVy4iv/ru44Ai8hxEP+P+nqHECXZwWNwBoMyjiQfxKpr6icGFjxYc=
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1630)

Просто чтобы выгрузить некоторые ответы, которые я не ищу, я собираюсь не загружать файлы, а в конечном итоге иметь возможность передавать потоки произвольной длины на s3, просто загружая части до их завершения, а затем комбинируя их,Таким образом, я не могу использовать TransferManager, потому что это требует от меня заранее знать размер, а я не буду.Кроме того, я не хочу делать это в виде файла, так как он будет выполняться в приложении сервера Dockerized.Поэтому я действительно хочу загрузить произвольное количество деталей.Я счастлив сделать это последовательно;хотя я не возражаю против параллелизма.

Я также использовал «com.github.alexmojaki: s3-stream-upload: 1.0.1», но, похоже, в памяти сохраняется много состояния (у меня кончились пару раз), поэтому я хотел бы заменить его на что-то более простое.

Обновление. Спасибо Илья в комментариях ниже.Удаление withFileOffset исправляет вещи.

1 Ответ

0 голосов
/ 08 февраля 2019

Удаление withFileOffset исправляет вещи.Спасибо @Ilya за указание на это.

Вот простой вывод, который я реализовал, который на самом деле работает.

package io.inbot.aws

import com.amazonaws.auth.profile.ProfileCredentialsProvider
import com.amazonaws.regions.Regions
import com.amazonaws.services.s3.AmazonS3
import com.amazonaws.services.s3.AmazonS3ClientBuilder
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest
import com.amazonaws.services.s3.model.InitiateMultipartUploadResult
import com.amazonaws.services.s3.model.PartETag
import com.amazonaws.services.s3.model.UploadPartRequest
import mu.KotlinLogging
import java.io.ByteArrayOutputStream
import java.io.OutputStream
import java.security.MessageDigest
import java.util.Base64

private val logger = KotlinLogging.logger {  }
class S3Writer(
    private val amazonS3: AmazonS3,
    private val bucket: String,
    private val key: String,
    private val threshold: Int = 5*1024*1024
) : OutputStream(), AutoCloseable {

    private val etags: MutableList<PartETag> = mutableListOf()

    private val multipartUpload: InitiateMultipartUploadResult = this.amazonS3.initiateMultipartUpload(InitiateMultipartUploadRequest(bucket, key))

    private val currentPart = ByteArrayOutputStream(threshold)

    private var partNumber = 1

    override fun write(b: Int) {
        currentPart.write(b)
        if(currentPart.size() > threshold) {
            sendPart()
        }
    }

    private fun sendPart(last: Boolean = false) {
        logger.info { "sending part $partNumber" }
        currentPart.flush()

        val bytes = currentPart.toByteArray()

        val md = MessageDigest.getInstance("MD5")
        md.update(bytes)
        val md5 = Base64.getEncoder().encode(md.digest())
        var partRequest = UploadPartRequest().withBucketName(bucket).withKey(key)
            .withUploadId(multipartUpload.uploadId)
            .withPartSize(currentPart.size().toLong())
            .withPartNumber(partNumber++)
            .withMD5Digest(md5.contentToString())
            .withInputStream(bytes.inputStream())

        if(last) {
            logger.info { "final part" }
            partRequest = partRequest.withLastPart(true)
        }

        val partResponse = amazonS3.uploadPart(partRequest)

        etags.add(partResponse.partETag)

        currentPart.reset()

    }


    override fun close() {
        if(currentPart.size() > 0) {
            sendPart(true)
        }
        logger.info { "completing" }
        amazonS3.completeMultipartUpload(CompleteMultipartUploadRequest(bucket, key, multipartUpload.uploadId, etags))
    }

}


fun main() {
    val amazonS3 =
        AmazonS3ClientBuilder.standard().withRegion(Regions.EU_WEST_1).withCredentials(ProfileCredentialsProvider())
            .build()

    val bucket = "io.inbot.sandbox"
    val key = "test.txt"

    try {
        S3Writer(amazonS3, bucket, key).use {
            val w = it.bufferedWriter()
            for (i in 0.rangeTo(1000000)) {
                w.write("Line $i: hello again ...\n")
            }
        }
    } catch (e: Throwable) {
        logger.error(e.message,e)
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...