Я использую Spark 2.3, и мне нужно сохранить Spark Dataframe в CSV-файл, и я ищу лучший способ сделать это .. просматривая похожие / похожие вопросы, я нашел этот , но мне нужно более конкретное:
Если DataFrame слишком велик, как я могу избежать использования Pandas?Потому что я использовал функцию toCSV()
(код ниже), и она выдала:
Ошибка нехватки памяти (не удалось выделить память).
Прямая запись в csvлучше использовать файловый ввод / вывод?Может ли это сохранить разделители?
Использование df.coalesce(1).write.option("header", "true").csv('mycsv.csv')
приведет к тому, что заголовок будет записан в каждом файле, а при объединении файлов заголовки будут посередине.Я не прав?
Использование spark write
и затем hadoop getmerge
лучше, чем использование coalesce с точки зрения производительности?
def toCSV(spark_df, n=None, save_csv=None, csv_sep=',', csv_quote='"'):
"""get spark_df from hadoop and save to a csv file
Parameters
----------
spark_df: incoming dataframe
n: number of rows to get
save_csv=None: filename for exported csv
Returns
-------
"""
# use the more robust method
# set temp names
tmpfilename = save_csv or (wfu.random_filename() + '.csv')
tmpfoldername = wfu.random_filename()
print n
# write sparkdf to hadoop, get n rows if specified
if n:
spark_df.limit(n).write.csv(tmpfoldername, sep=csv_sep, quote=csv_quote)
else:
spark_df.write.csv(tmpfoldername, sep=csv_sep, quote=csv_quote)
# get merge file from hadoop
HDFSUtil.getmerge(tmpfoldername, tmpfilename)
HDFSUtil.rmdir(tmpfoldername)
# read into pandas df, remove tmp csv file
pd_df = pd.read_csv(tmpfilename, names=spark_df.columns, sep=csv_sep, quotechar=csv_quote)
os.remove(tmpfilename)
# re-write the csv file with header!
if save_csv is not None:
pd_df.to_csv(save_csv, sep=csv_sep, quotechar=csv_quote)