Pyspark: загрузите файл tar.gz в фрейм данных и выполните фильтрацию по имени файла. - PullRequest
1 голос
/ 07 февраля 2020

У меня есть файл tar.gz, содержащий несколько файлов. Иерархия выглядит как ниже. Я собираюсь прочитать файл tar.gz, отфильтровать содержимое b.tsv как метаданные stati c, где все остальные файлы являются реальными записями.

gzfile.tar.gz
|- a.tsv
|- b.tsv
|- thousand more files.

При загрузке pyspark я Я могу загрузить файл в информационный кадр. Я использовал команду:

spark = SparkSession.\
        builder.\
        appName("Loading Gzip Files").\
        getOrCreate()
input = spark.read.load('/Users/jeevs/git/data/gzfile.tar.gz',\
          format='com.databricks.spark.csv',\
          sep = '\t'

С целью фильтрации я добавил имя файла

from  pyspark.sql.functions import input_file_name
input.withColumn("filename", input_file_name())

, которое теперь генерирует данные следующим образом:

|_c0 |_c1 |filename |
|b.tsv0000666000076500001440035235677713575350214013124 0ustar  netsaintusers1|Lynx 2.7.1|file:///Users/jeevs/git/data/gzfile.tar.gz|
|2|Lynx 2.7|file:///Users/jeevs/git/data/gzfile.tar.gz|

Конечно, поле файла заполняется файлом tar.gz, что делает этот подход бесполезным. Еще более раздражающая проблема заключается в том, что _c0 заполняется filename + garbage + first row values

На данный момент, мне интересно, становится ли странным чтение самого файла, так как это tar файл .gz. Когда мы выполнили v1 этой обработки (спарк 0.9), у нас был еще один шаг, который загружал данные из s3 в блок ec2, извлекал и записывал обратно в s3. Я пытаюсь избавиться от этих шагов.

Заранее спасибо!

...