PySpark записывает два фрейма данных в один и тот же раздел, но разделенные папкой - PullRequest
1 голос
/ 13 июля 2020

Я использую Spark для записи двух разных фреймов данных в один и тот же раздел, но я хочу, чтобы они были разделены папками в конце раздела. т.е. первый фрейм данных будет записывать в yyyy/mm/dd/, а второй - в yyyy/mm/dd/rejected/

В настоящее время я могу записать первый фрейм данных в yyyy/mm/dd/, а второй фрейм данных в rejected/yyyy/mm/dd, используя следующий код:

  first_df.repartition('year', 'month', 'day').write \
    .partitionBy('year', 'month', 'day') \
    .mode("append") \
    .csv(f"{output_path}/")

  second_df.repartition('year', 'month', 'day').write \
    .partitionBy('year', 'month', 'day') \
    .mode("append") \
    .csv(f"{output_path}/rejected")

Любые предложения приветствуются

1 Ответ

1 голос
/ 13 июля 2020

Добавьте rejected как буквальное значение к second_df, затем включите в partitionBy т.е.

second_df.withColumn("rej",lit("rejected")) \
    .repartition('year', 'month', 'day').write \
    .partitionBy('year', 'month', 'day','rej') \
    .mode("append") \
    .csv(f"{output_path}")

Другой способ будет использовать hadoop file api для перемещения файлов в соответствующий каталог.

Update:

Rename the directory:

URI           = sc._gateway.jvm.java.net.URI
Path          = sc._gateway.jvm.org.apache.hadoop.fs.Path
FileSystem    = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
Configuration = sc._gateway.jvm.org.apache.hadoop.conf.Configuration
    
fs = FileSystem.get(URI("hdfs://<name_node>:8020"), Configuration())
#rename the directory
fs.rename(Path(f'{output_path}/rej=rejected'),Path(f'{output_path}/rejected'))
...