Spark df partitioniong после разбиения на гг / мм / дд - PullRequest
0 голосов
/ 18 января 2020

S3 содержит очень большой сжатый файл (20 ГБ сжатый -> 200 ГБ несжатый). Я хочу прочитать этот файл (к сожалению, распаковать на одно ядро), преобразовать некоторые столбцы sql, а затем вывести на S3 в формате s3_path/year=2020/month=01/day=01/[files 1-200].parquet.

Весь файл будет состоять из данных из та же дата. Это заставляет меня поверить, что вместо использования partitionBy('year','month','day') я должен добавить "year={year}/month={month}/day={day}/" к пути s3, потому что в настоящее время spark записывает один файл за раз в s3 (размер 1 ГБ каждый). Правильно ли мое мышление?

Вот что я сейчас делаю:

df = df\
    .withColumn('year', lit(datetime_object.year))\
    .withColumn('month', lit(datetime_object.month))\
    .withColumn('day', lit(datetime_object.day))

df\
    .write\
    .partitionBy('year','month','day')\
    .parquet(s3_dest_path, mode='overwrite')

Что я думаю:

df = spark.read.format('json')\
    .load(s3_file, schema=StructType.fromJson(my_schema))\
    .repartition(200)
# currently takes a long time decompressing the 20gb s3_file.json.gz

# transform
df.write\
    .parquet(s3_dest_path + 'year={}/month={}/day={}/'.format(year,month,day))

1 Ответ

2 голосов
/ 18 января 2020

Вы, вероятно, сталкиваетесь с проблемой, когда spark записывает данные сначала в какой-то временный каталог, а затем фиксирует его в окончательном месте. В HDFS это делается переименованием. Однако S3 не поддерживает переименования, а копирует данные полностью (только с использованием одного исполнителя). Подробнее об этой теме c см., Например, этот пост: Чрезвычайно медленное время записи S3 из EMR / Spark

Обычный обходной путь - запись в hdfs, а затем использование distcp для копирования распространяется от hdfs до s3

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...