Разархивирование файлов PySpark: Какой подход хорош для разархивирования файлов и хранения файлов csv в Delta Table? - PullRequest
0 голосов
/ 30 октября 2019

У меня есть zip-файлы, хранящиеся в Amazon s3, затем список Python как ["s3://mybucket/file1.zip", ..., "s3://mybucket/fileN.zip"], мне нужно распаковать все эти файлы с помощью Spark Cluster и сохранить все CSV-файлы в таблицу дельта-формата. Я хотел бы знать более быстрый подход к обработке, чем мой текущий подход:

1) У меня есть bucle для для итерации в моем списке Python.

2) I 'Получение zip-файлов из s3 с использованием Python Boto3 s3.bucket.Object(file)

3) Я распаковываю файлы, используя следующий код

import io
import boto3
import shutil
import zipfile
for file in ["s3://mybucket/file1.zip", ..., "s3://mybucket/fileN.zip"]:
    obj = s3.bucket.Object(file)
    with io.BytesIO(obj.get()["Body"].read()) as tf:
        tf.seek(0)
        with zipfile.ZipFile(tf, mode='r') as zipf:
            for subfile in zipf.namelist():
                zipf.extract(subfile, outputZip)
    dbutils.fs.cp("file:///databricks/driver/{0}".format(outputZip), "dbfs:" + outputZip, True)
    shutil.rmtree(outputZip)
    dbutils.fs.rm("dbfs:" + outputZip, True)

4) Мои файлы распаковываются в узле драйвератогда исполнители не могут получить доступ к этим файлам (я не нахожу способ сделать это), поэтому я перемещаю все эти CSV-файлы в DBFS, используя dbutils.fs.cp()

5) Я читаю все CSV-файлы изDBFS с использованием Pyspark Dataframe, и я записываю это в дельта-таблицуиз S3 в таблицу Delta за меньшее время, чем мой предыдущий процесс. Я полагаю, что я могу распараллелить некоторые из этих процессов как шаг 1), я бы хотел избежать шага копирования в DBFS, потому что мне не нужно там хранить данные, а также мне нужно удалять файлы CSV после каждого входа вдельта-таблица, чтобы избежать ошибки памяти на диске узла драйвера. Любой совет?

1 Ответ

0 голосов
/ 08 ноября 2019

Что ж, возможно несколько вариантов решения:

  1. Вы можете прочитать все файлы вместе (если это позволяет схема) с помощью df=spark.read.csv("s3://mybucket") и записать кадр данных как дельта с помощью df.write.format("delta").save(path)
  2. Вы можете прочитать каждый файл отдельно в кадре данных и напрямую добавить в существующую дельта-таблицу (даже если она пустую), не сохраняя ее в DBFS. Для более подробной информации: https://docs.databricks.com/delta/delta-batch.html#append-using-dataframes
  3. Вы можете прочитать каждый файл отдельно в кадре данных и объединить его в существующий основной кадр данных. В конце концов, вы можете записать основной фрейм данных в виде дельта-таблицы.

Вариант 3 будет выглядеть примерно так:

    import io
    import boto3
    import shutil
    import zipfile
    from pyspark.sql import SparkSession

    spark = SparkSession.builder.appName("name").getOrCreate()

    schema = StructType([
    \\ YOUR DATA SCHMEA
    ])

    df = spark.createDataFrame([], schema)

    for file in ["s3://mybucket/file1.zip", ..., "s3://mybucket/fileN.zip"]:
        obj = s3.bucket.Object(file)
        with io.BytesIO(obj.get()["Body"].read()) as tf:
            tf.seek(0)
            with zipfile.ZipFile(tf, mode='r') as zipf:
                for subfile in zipf.namelist():
                    zipf.extract(subfile, outputZip)
        tempdf = spark.read.option("header", "true").csv(outputZip)
        df = df.union(tempdf)      

    df.write.format("delta").save(path)
...