Как скопировать файл в pyspark / hadoop из python - PullRequest
0 голосов
/ 05 февраля 2019

Я использую pyspark для сохранения фрейма данных в виде файла паркета или в виде файла csv с этим:

def write_df_as_parquet_file(df, path, mode="overwrite"):
    df = df.repartition(1)  # join partitions to produce 1 parquet file
    dfw = df.write.format("parquet").mode(mode)
    dfw.save(path)

def write_df_as_csv_file(df, path, mode="overwrite", header=True):
    df = df.repartition(1)  # join partitions to produce 1 csv file
    header = "true" if header else "false"
    dfw = df.write.format("csv").option("header", header).mode(mode)
    dfw.save(path)

Но при этом файл parquet / csv сохраняется в папке с именем path, гдеон сохраняет несколько других файлов, которые нам не нужны, таким образом:

4 files are created in path, but we only care about the PARQUET file

Изображение: https://ibb.co/9c1D8RL

По сути, я хотел бы создатьнекоторая функция, которая сохраняет файл в определенном месте, используя вышеуказанные методы, а затем перемещает файл CSV или PARQUET в новое место.Как:

def write_df_as_parquet_file(df, path, mode="overwrite"):
    # save df in one file inside tmp_folder
    df = df.repartition(1)  # join partitions to produce 1 parquet file
    dfw = df.write.format("parquet").mode(mode)
    tmp_folder = path + "TEMP"
    dfw.save(tmp_folder)

    # move parquet file from tmp_folder to path
    copy_file(tmp_folder + "*.parquet", path)
    remove_folder(tmp_folder)

Как я могу это сделать?Как мне реализовать copy_file или remove_folder?Я видел несколько решений в scala, в которых для этого используется API Hadoop, но я не смог заставить эту работу работать на python.Я думаю, что мне нужно использовать sparkContext, но я все еще изучаю Hadoop и не нашел способа сделать это.

1 Ответ

0 голосов
/ 05 февраля 2019

Вы можете использовать одну из библиотек Python HDFS, чтобы подключиться к вашему экземпляру HDFS, а затем выполнить любые необходимые операции.

Из документов hdfs3 (https://hdfs3.readthedocs.io/en/latest/quickstart.html):

from hdfs3 import HDFileSystem
hdfs = HDFileSystem(host=<host>, port=<port>)
hdfs.mv(tmp_folder + "*.parquet", path)

Оберните вышеупомянутоев функции, и все готово.

Примечание: я только что использовал hdfs3 в качестве примера. Вы также можете использовать hdfsCLI.

...