Обработка потоковой передачи TarArchiveEntry в S3 Bucket из файла .tar.gz - PullRequest
0 голосов
/ 02 октября 2018

Я использую aws Lamda для распаковки и обхода файлов tar.gz, а затем загружаю их обратно в дефлированный s3, сохраняя исходную структуру каталогов.

Я столкнулся с проблемой потоковой передачи TarArchiveEntry в корзину S3 через PutObjectRequest.В то время как первая запись успешно транслируется, при попытке getNextTarEntry () в TarArchiveInputStream генерируется нулевой указатель из-за того, что базовый инфлятор GunzipCompress имеет значение null, которое имеет соответствующее значение до s3.putObject (новый PutObjectRequest (...))вызов.

Мне не удалось найти документацию о том, как / почему атрибут инфлятора входного потока gz устанавливается на нуль после частичной отправки на s3. РЕДАКТИРОВАТЬ Дальнейшие исследования показали, что вызов AWS, по-видимому, закрывает поток ввода после завершения загрузки заданной длины содержимого ... не удалось найти, как предотвратить это поведение.

Ниже, по сути, выглядит мой код.Заранее благодарю за помощь, комментарии и предложения.

public String handleRequest(S3Event s3Event, Context context) {

    try {
        S3Event.S3EventNotificationRecord s3EventRecord = s3Event.getRecords().get(0);
        String s3Bucket = s3EventRecord.getS3().getBucket().getName();

        // Object key may have spaces or unicode non-ASCII characters.
        String srcKey = s3EventRecord.getS3().getObject().getKey();

        System.out.println("Received valid request from bucket: " + bucketName + " with srckey: " + srcKeyInput);

        String bucketFolder = srcKeyInput.substring(0, srcKeyInput.lastIndexOf('/') + 1);
        System.out.println("File parent directory: " + bucketFolder);

        final AmazonS3 s3Client = AmazonS3ClientBuilder.defaultClient();

        TarArchiveInputStream tarInput = new TarArchiveInputStream(new GzipCompressorInputStream(getObjectContent(s3Client, bucketName, srcKeyInput)));

        TarArchiveEntry currentEntry = tarInput.getNextTarEntry();

        while (currentEntry != null) {
            String fileName = currentEntry.getName();
            System.out.println("For path = " + fileName);

            // checking if looking at a file (vs a directory)
            if (currentEntry.isFile()) {

                System.out.println("Copying " + fileName + " to " + bucketFolder + fileName + " in bucket " + bucketName);
                ObjectMetadata metadata = new ObjectMetadata();
                metadata.setContentLength(currentEntry.getSize());

                s3Client.putObject(new PutObjectRequest(bucketName, bucketFolder + fileName, tarInput, metadata)); // contents are properly and successfully sent to s3
                System.out.println("Done!");
            }

            currentEntry = tarInput.getNextTarEntry(); // NPE here due underlying gz inflator is null;
        }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        IOUtils.closeQuietly(tarInput);
    }
}

1 Ответ

0 голосов
/ 02 октября 2018

Это правда, AWS закрывает InputStream, предоставленное PutObjectRequest, и я не знаю, как проинструктировать AWS не делать этого.

Однако вы можете обернуть TarArchiveInputStreamс CloseShieldInputStream из Commons IO , например:

InputStream shieldedInput = new CloseShieldInputStream(tarInput);

s3Client.putObject(new PutObjectRequest(bucketName, bucketFolder + fileName, shieldedInput, metadata));

Когда AWS закроет предоставленный CloseShieldInputStream, базовый TarArchiveInputStream останется открытым.


PS.Я не знаю, что делает ByteArrayInputStream(tarInput.getCurrentEntry()), но это выглядит очень странно.Я проигнорировал это с целью этого ответа.

...