Как прочитать файл .tar, содержащий паркет на S3, как кадры данных в Spark? - PullRequest
2 голосов
/ 08 апреля 2020

Мне нужно загрузить .tar файл на S3, который содержит несколько паркетов с различной схемой, используя Scala / Spark. В идеале я хотел бы прочитать один из этих паркетов в кадре данных Spark. Я попытался получить объект s3 и затем преобразовать его в поток ввода tar, используя org. apache .commons.compress.archivers.tar.TarArchiveInputStream, и он смог создать поток ввода tar, но не смог прочитать записи tar.

val s3client: AmazonS3 = AmazonS3ClientBuilder.
      standard().
      withCredentials(new InstanceProfileCredentialsProvider()).
      withRegion(my_region).
      build();

val tarFile = s3client.getObject(my_bucket, my_tar_file)
val tarInputStream = new TarArchiveInputStream(tarFile.getObjectContent)
tarInputStream.getNextTarEntry() <-- error thrown in this line

Ошибка:

java.io.IOException: Error detected parsing the header
  at org.apache.commons.compress.archivers.tar.TarArchiveInputStream.getNextTarEntry(TarArchiveInputStream.java:240)
  ... 52 elided
Caused by: java.lang.IllegalArgumentException: Invalid byte 48 at offset 7 in '00755{NUL}00' len=8
  at org.apache.commons.compress.archivers.tar.TarUtils.parseOctal(TarUtils.java:127)
  at org.apache.commons.compress.archivers.tar.TarUtils.parseOctalOrBinary(TarUtils.java:171)
  at org.apache.commons.compress.archivers.tar.TarArchiveEntry.parseTarHeader(TarArchiveEntry.java:935)
  at org.apache.commons.compress.archivers.tar.TarArchiveEntry.parseTarHeader(TarArchiveEntry.java:924)
  at org.apache.commons.compress.archivers.tar.TarArchiveEntry.<init>(TarArchiveEntry.java:328)
  at org.apache.commons.compress.archivers.tar.TarArchiveInputStream.getNextTarEntry(TarArchiveInputStream.java:238)

Кто-нибудь знает, как правильно извлечь часть файла tar на s3 в Spark?

Ответы [ 2 ]

0 голосов
/ 09 апреля 2020

Следуйте этому примеру. Я надеюсь, что вы используете tar.gz

AWSCredentials credentials = new BasicAWSCredentials("accessKey", "secretKey");
AWSCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(credentials);
AmazonS3 s3Client = AmazonS3ClientBuilder.standard().withRegion(Regions.US_EAST_1).withCredentials(credentialsProvider).build();
S3Object object = s3Client.getObject("bucketname", "file.tar.gz");
S3ObjectInputStream objectContent = object.getObjectContent();

TarArchiveInputStream tarInputStream = new TarArchiveInputStream(new GZIPInputStream(objectContent));
TarArchiveEntry currentEntry;
while((currentEntry = tarInputStream.getNextTarEntry()) != null) {
    if(currentEntry.getName().equals("1/foo.bar") && currentEntry.isFile()) {
        FileOutputStream entryOs = new FileOutputStream("foo.bar");
        IOUtils.copy(tarInputStream, entryOs);
        entryOs.close();
        break;
    }
}
objectContent.abort();  // Warning at this line
tarInputStream.close(); // warning at this line

scala эквивалент равен

    val credentials: AWSCredentials =
      new BasicAWSCredentials("accessKey", "secretKey")
    val credentialsProvider: AWSCredentialsProvider =
      new AWSStaticCredentialsProvider(credentials)
    val s3Client: AmazonS3 = AmazonS3ClientBuilder
      .standard()
      .withRegion(Regions.US_EAST_1)
      .withCredentials(credentialsProvider)
      .build()
    val s3object: S3Object = s3Client.getObject("bucketname", "file.tar.gz")
    val objectContent: S3ObjectInputStream = s3object.getObjectContent
    val tarInputStream: TarArchiveInputStream = new TarArchiveInputStream(
      new GZIPInputStream(objectContent))
    var currentEntry: TarArchiveEntry = null
    while ((currentEntry = tarInputStream.getNextTarEntry) != null) 
if (currentEntry.getName ==("1/foo.bar") && currentEntry.isFile) {
      val entryOs: FileOutputStream = new FileOutputStream("foo.bar")
      IOUtils.copy(tarInputStream, entryOs)
      entryOs.close()
    }
    objectContent.abort()
    tarInputStream.close()
  }

Обновление:

, поскольку вы используете только tar, а не gzip

так что вы должны читать вот так ...

val tarInputStream = new TarArchiveInputStream(new FileInputStream(
    tarFile.getObjectContent))
0 голосов
/ 09 апреля 2020

В вашем случае вы передаете объект как InputStream. Я предлагаю передать его как GzipInputstream, а затем прочитать записи:

val tarInputStream = new TarArchiveInputStream(tarFile.getObjectContent)

val tarInputStream = new TarArchiveInputStream(new GZIPInputStream(tarFile))
val entry: TarArchiveEntry = readEntries(tarInputStream)
def readEntries(tarInputStream: TarArchiveInputStream): TarArchiveEntry = {
  var currentEntry = Option(tarInputStream.getNextTarEntry())
  // you can use functional approach with foldLeft, reduce or something else or while loop
  // implementation details here
}

Вы можете найти, как использовать TarArchiveInputStream здесь

...