Запись большого Spark Dataframe в CSV-файл - PullRequest
0 голосов
/ 06 июня 2018

Я использую Spark 2.3, и мне нужно сохранить Spark Dataframe в CSV-файл, и я ищу лучший способ сделать это .. просматривая похожие / похожие вопросы, я нашел этот , но мне нужно более конкретное:

Если DataFrame слишком велик, как я могу избежать использования Pandas?Потому что я использовал функцию toCSV() (код ниже), и она выдала:

Ошибка нехватки памяти (не удалось выделить память).

Прямая запись в csvлучше использовать файловый ввод / вывод?Может ли это сохранить разделители?

Использование df.coalesce(1).write.option("header", "true").csv('mycsv.csv') приведет к тому, что заголовок будет записан в каждом файле, а при объединении файлов заголовки будут посередине.Я не прав?

Использование spark write и затем hadoop getmerge лучше, чем использование coalesce с точки зрения производительности?

def toCSV(spark_df, n=None, save_csv=None, csv_sep=',', csv_quote='"'):
        """get spark_df from hadoop and save to a csv file

        Parameters
        ----------
        spark_df: incoming dataframe
        n: number of rows to get
        save_csv=None: filename for exported csv

        Returns
        -------

        """

        # use the more robust method
        # set temp names
        tmpfilename = save_csv or (wfu.random_filename() + '.csv')
        tmpfoldername = wfu.random_filename()
        print n
        # write sparkdf to hadoop, get n rows if specified
        if n:
            spark_df.limit(n).write.csv(tmpfoldername, sep=csv_sep, quote=csv_quote)
        else:
            spark_df.write.csv(tmpfoldername, sep=csv_sep, quote=csv_quote)

        # get merge file from hadoop
        HDFSUtil.getmerge(tmpfoldername, tmpfilename)
        HDFSUtil.rmdir(tmpfoldername)

        # read into pandas df, remove tmp csv file
        pd_df = pd.read_csv(tmpfilename, names=spark_df.columns, sep=csv_sep, quotechar=csv_quote)
        os.remove(tmpfilename)

        # re-write the csv file with header!
        if save_csv is not None:
            pd_df.to_csv(save_csv, sep=csv_sep, quotechar=csv_quote)

Ответы [ 2 ]

0 голосов
/ 06 июня 2018

Мы использовали библиотеку данных.Работает нормально

df.save("com.databricks.spark.csv", SaveMode.Overwrite, Map("delimiter" -> delim, "nullValue" -> "-", "path" -> tempFPath))

Библиотека:

<!-- spark df to csv -->
    <dependency>
        <groupId>com.databricks</groupId>
        <artifactId>spark-csv_2.10</artifactId>
        <version>1.3.0</version>
    </dependency>
0 голосов
/ 06 июня 2018

Если DataFrame слишком велик, как я могу избежать использования Pandas?

Вы можете просто сохранить файл в HDFS или S3 или в любое другое распределенное хранилище.

Является ли прямая запись в CSV с использованием файлового ввода-вывода лучшим способом?Может ли он сохранить разделители?

Если вы подразумеваете под этим сохранение файла в локальное хранилище - это все равно вызовет исключение OOM, так как вам потребуется переместить все данные в памяти на локальном компьютере, чтобы сделать это,

Использование df.coalesce (1) .write.option ("header", "true"). Csv ('mycsv.csv') приведет к тому, что заголовок будет записан в каждом файле, и когдафайлы объединены, заголовки будут посередине.Я не прав?

В этом случае у вас будет только 1 файл (так как вы делаете coalesce(1)).Так что вам не нужно заботиться о заголовках.Вместо этого - вы должны заботиться о памяти на исполнителях - вы можете получить OOM на исполнителе, поскольку все данные будут перемещены к этому исполнителю.

Использование spark write и затем hasoop getmerge лучше, чем использование coalesceс точки зрения производительности?

Определенно лучше (но не используйте coalesce()).Spark будет эффективно записывать данные в хранилище, затем HDFS будет дублировать данные, и после этого getmerge сможет эффективно считывать данные с узлов и объединять их.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...