У меня есть zip-файлы, хранящиеся в Amazon s3, затем список Python как ["s3://mybucket/file1.zip", ..., "s3://mybucket/fileN.zip"]
, мне нужно распаковать все эти файлы с помощью Spark Cluster и сохранить все CSV-файлы в таблицу дельта-формата. Я хотел бы знать более быстрый подход к обработке, чем мой текущий подход:
1) У меня есть bucle для для итерации в моем списке Python.
2) I 'Получение zip-файлов из s3 с использованием Python Boto3 s3.bucket.Object(file)
3) Я распаковываю файлы, используя следующий код
import io
import boto3
import shutil
import zipfile
for file in ["s3://mybucket/file1.zip", ..., "s3://mybucket/fileN.zip"]:
obj = s3.bucket.Object(file)
with io.BytesIO(obj.get()["Body"].read()) as tf:
tf.seek(0)
with zipfile.ZipFile(tf, mode='r') as zipf:
for subfile in zipf.namelist():
zipf.extract(subfile, outputZip)
dbutils.fs.cp("file:///databricks/driver/{0}".format(outputZip), "dbfs:" + outputZip, True)
shutil.rmtree(outputZip)
dbutils.fs.rm("dbfs:" + outputZip, True)
4) Мои файлы распаковываются в узле драйвератогда исполнители не могут получить доступ к этим файлам (я не нахожу способ сделать это), поэтому я перемещаю все эти CSV-файлы в DBFS, используя dbutils.fs.cp()
5) Я читаю все CSV-файлы изDBFS с использованием Pyspark Dataframe, и я записываю это в дельта-таблицуиз S3 в таблицу Delta за меньшее время, чем мой предыдущий процесс. Я полагаю, что я могу распараллелить некоторые из этих процессов как шаг 1), я бы хотел избежать шага копирования в DBFS, потому что мне не нужно там хранить данные, а также мне нужно удалять файлы CSV после каждого входа вдельта-таблица, чтобы избежать ошибки памяти на диске узла драйвера. Любой совет?