Я пытаюсь сохранить pyspark. sql .dataframe.DataFrame в формате CSV (также может быть другим форматом, если он легко читается).
Итак, я нашел пару примеров для сохранения DataFrame. Тем не менее, я теряю информацию каждый раз, когда я пишу ее.
Пример набора данных:
# Create an example Pyspark DataFrame
from pyspark.sql import Row
Employee = Row("firstName", "lastName", "email", "salary")
employee1 = Employee('A', 'AA', 'mail1', 100000)
employee2 = Employee('B', 'BB', 'mail2', 120000 )
employee3 = Employee('C', None, 'mail3', 140000 )
employee4 = Employee('D', 'DD', 'mail4', 160000 )
employee5 = Employee('E', 'EE', 'mail5', 160000 )
department1 = Row(id='123', name='HR')
department2 = Row(id='456', name='OPS')
department3 = Row(id='789', name='FN')
department4 = Row(id='101112', name='DEV')
departmentWithEmployees1 = Row(department=department1, employees=[employee1, employee2, employee5])
departmentWithEmployees2 = Row(department=department2, employees=[employee3, employee4])
departmentWithEmployees3 = Row(department=department3, employees=[employee1, employee4, employee3])
departmentWithEmployees4 = Row(department=department4, employees=[employee2, employee3])
departmentsWithEmployees_Seq = [departmentWithEmployees1, departmentWithEmployees2]
dframe = spark.createDataFrame(departmentsWithEmployees_Seq)
Чтобы сохранить этот файл в формате CSV, я сначала попытался это решение :
type(dframe)
Out[]: pyspark.sql.dataframe.DataFrame
dframe.write.csv('junk_mycsv.csv')
К сожалению, это приводит к этой ошибке:
org.apache.spark.sql.AnalysisException: CSV data source does not support struct<id:string,name:string> data type.;
Это причина, по которой я попробовал другую возможность - преобразовать искровой фрейм данных в pandas фрейм данных и затем сохранить его. Как упоминалось в этом примере.
pandas_df = dframe.toPandas()
Работает хорошо! Однако, если я показываю свои данные, то в них отсутствуют данные:
print(pandas_df.head())
department employees
0 (123, HR) [(A, AA, mail1, 100000), (B, BB, mail2, 120000...
1 (456, OPS) [(C, None, mail3, 140000), (D, DD, mail4, 1600...
Как вы можете видеть на снимке ниже, нам не хватает информации. Поскольку данные должны быть такими:
department employees
0 id:123, name:HR firstName: A, lastName: AA, email: mail1, salary: 100000
# Info is missing like 'id', 'name', 'firstName', 'lastName', 'email' etc.
# For the complete expected example, see screenshow below.
Только для информации: я работаю в Databricks с Python.
Следовательно, как я могу записать свои данные (dframe из приведенного выше примера) без потери информации?
Заранее большое спасибо!
Edit Добавление изображения для Pault, чтобы показать формат CSV (и заголовки).
Edit2 Замена изображения, например, вывод csv:
После запуска кода Pault:
from pyspark.sql.functions import to_json
dframe.select(*[to_json(c).alias(c) for c in dframe.columns])\
.repartition(1).write.csv("junk_mycsv.csv", header= True)
Вывод не аккуратный, так как большинство столбцов Заголовки пусты (из-за вложенного формата?). Копируется только первая строка:
department employees (empty ColName) (empty ColName) (and so on)
{\id\":\"123\" \"name\":\"HR\"}" [{\firstName\":\"A\" \"lastName\":\"AA\" (...)