Я разрабатываю задание PySpark, которое читает текстовые файлы и записывает файлы паркета в AWS S3.
Мне нужно добавить столбец в каждой строке паркета, который описывает исходный файл оригинала текстовая строка.
Я использую input_file_name
, но не вижу правильного имени исходного файла в столбце вывода. Кажется, что используют одно и то же имя файла для целых задач или для неправильного раздела.
Я не знаю, вызвано ли это перераспределением, которое мне нужно сделать, или потому что я читаю всю папку (префикс ) в S3.
Вот пример моего кода:
rdd = spark.sparkContext.textFile("s3a://source-bucket/prefix/")
rdd = rdd.repartition(numPartitions=(120))
rdd = rdd.map(lambda line: custom_regex_parser)
df = spark.createDataFrame(rdd, schema=custom_source_schema)
df = df..withColumn("source_file_name", input_file_name)
df.write.mode('overwrite').parquet("s3://dest-bucket/prefix/")
В корзине S3 у меня есть много файлов GZ, различного размера (сжатые от нескольких КБ до 50 МБ). Мне нужно сделать перераспределение, чтобы избежать случайных проблем с памятью в некоторых задачах Spark.
Может ли это перераспределение вызвать проблему с input_file_name?
После этого, есть ли у вас какие-либо предложения для лучшего распространения данных по исполнители однородно? Это довольно много времени ...