Я использую pyspark для сохранения фрейма данных в виде файла паркета или в виде файла csv с этим:
def write_df_as_parquet_file(df, path, mode="overwrite"):
df = df.repartition(1) # join partitions to produce 1 parquet file
dfw = df.write.format("parquet").mode(mode)
dfw.save(path)
def write_df_as_csv_file(df, path, mode="overwrite", header=True):
df = df.repartition(1) # join partitions to produce 1 csv file
header = "true" if header else "false"
dfw = df.write.format("csv").option("header", header).mode(mode)
dfw.save(path)
Но при этом файл parquet / csv сохраняется в папке с именем path
, гдеон сохраняет несколько других файлов, которые нам не нужны, таким образом:
Изображение: https://ibb.co/9c1D8RL
По сути, я хотел бы создатьнекоторая функция, которая сохраняет файл в определенном месте, используя вышеуказанные методы, а затем перемещает файл CSV или PARQUET в новое место.Как:
def write_df_as_parquet_file(df, path, mode="overwrite"):
# save df in one file inside tmp_folder
df = df.repartition(1) # join partitions to produce 1 parquet file
dfw = df.write.format("parquet").mode(mode)
tmp_folder = path + "TEMP"
dfw.save(tmp_folder)
# move parquet file from tmp_folder to path
copy_file(tmp_folder + "*.parquet", path)
remove_folder(tmp_folder)
Как я могу это сделать?Как мне реализовать copy_file
или remove_folder
?Я видел несколько решений в scala, в которых для этого используется API Hadoop, но я не смог заставить эту работу работать на python.Я думаю, что мне нужно использовать sparkContext, но я все еще изучаю Hadoop и не нашел способа сделать это.