Сохранение фрейма данных pyspark после агрегирования с groupBy в виде csv-файла - PullRequest
0 голосов
/ 07 февраля 2019

Я изучаю pyspark, и меня немного смущает вопрос о том, как сохранить сгруппированный фрейм данных в виде файла csv (если предположить, что по некоторым причинам - например, ограничения ОЗУ - я не хочу сначала конвертировать его в фрейм данных Pandas).

Для воспроизводимого примера:

import seaborn as sns
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.master('local') \
.appName('Data cleaning') \
.getOrCreate()
from pyspark.context import SparkContext
sc = SparkContext.getOrCreate()
from pyspark.sql.functions import *

mpg= sns.load_dataset('mpg')
mpg_sp = spark.createDataFrame(mpg)
mpg_grp = mpg_sp.groupBy('model_year', 'origin').avg('displacement', 'weight')

# The command below fails in the sense that it creates a folder with multiple  files in it rather than a single csv file as I would expect

mpg_grp.write.csv('mpg_grp.csv')

# By applying the collect method I get a list which can not be saved as a csv file

mpg_grp1 = mpg_grp.collect()
type(mpg_grp1)
list

Ответы [ 2 ]

0 голосов
/ 07 февраля 2019

Вышеуказанный ответ верен, но результаты его использования не очень хороши.
Конечно, вы можете использовать repartition (1) или coalesce (1), но это приведет к передаче всех ваших данных одному работнику и значительно замедлит работу.вниз вашего кода.
Чтобы избежать этого, я бы посоветовал вам разбить данные на один из столбцов в наборе данных.А затем напишите простой код, чтобы получить один файл на раздел:

cols = ["$name"]
mpg_grp.repartition(cols).write.partitionBy(cols).csv("$location")

Таким образом, данные будут разделены между рабочими по одному из ваших столбцов, и вы получите ровно один файл на ваш раздел (по дате какпример).

0 голосов
/ 07 февраля 2019

Spark - это распределенная структура.Поэтому вывод в нескольких файлах - это нормальное поведение ... каждый работник напишет свою часть, в результате получится несколько маленьких файлов.

Вы можете немного обмануть систему с помощью этой команды:

mpg_grp.coalesce(1).write.csv('mpg_grp.csv')

Это позволит записать только 1 файл (но все еще в папке с именем 'mpg_grp.csv').
Внимание: Это может быть довольно медленно.

...