Написать pyspark. sql .dataframe.DataFrame без потери информации - PullRequest
0 голосов
/ 02 апреля 2020

Я пытаюсь сохранить pyspark. sql .dataframe.DataFrame в формате CSV (также может быть другим форматом, если он легко читается).

Итак, я нашел пару примеров для сохранения DataFrame. Тем не менее, я теряю информацию каждый раз, когда я пишу ее.

Пример набора данных:

# Create an example Pyspark DataFrame

from pyspark.sql import Row

Employee = Row("firstName", "lastName", "email", "salary")
employee1 = Employee('A', 'AA', 'mail1', 100000)
employee2 = Employee('B', 'BB', 'mail2', 120000 )
employee3 = Employee('C', None, 'mail3', 140000 )
employee4 = Employee('D', 'DD', 'mail4', 160000 )
employee5 = Employee('E', 'EE', 'mail5', 160000 )

department1 = Row(id='123', name='HR')
department2 = Row(id='456', name='OPS')
department3 = Row(id='789', name='FN')
department4 = Row(id='101112', name='DEV')

departmentWithEmployees1 = Row(department=department1, employees=[employee1, employee2, employee5])
departmentWithEmployees2 = Row(department=department2, employees=[employee3, employee4])
departmentWithEmployees3 = Row(department=department3, employees=[employee1, employee4, employee3])
departmentWithEmployees4 = Row(department=department4, employees=[employee2, employee3])

departmentsWithEmployees_Seq = [departmentWithEmployees1, departmentWithEmployees2]
dframe = spark.createDataFrame(departmentsWithEmployees_Seq)

Чтобы сохранить этот файл в формате CSV, я сначала попытался это решение :

type(dframe)
Out[]: pyspark.sql.dataframe.DataFrame
dframe.write.csv('junk_mycsv.csv')

К сожалению, это приводит к этой ошибке:

org.apache.spark.sql.AnalysisException: CSV data source does not support struct<id:string,name:string> data type.; 

Это причина, по которой я попробовал другую возможность - преобразовать искровой фрейм данных в pandas фрейм данных и затем сохранить его. Как упоминалось в этом примере.

pandas_df = dframe.toPandas()

Работает хорошо! Однако, если я показываю свои данные, то в них отсутствуют данные:

print(pandas_df.head())

department                                          employees
0   (123, HR)  [(A, AA, mail1, 100000), (B, BB, mail2, 120000...
1  (456, OPS)  [(C, None, mail3, 140000), (D, DD, mail4, 1600...

Как вы можете видеть на снимке ниже, нам не хватает информации. Поскольку данные должны быть такими:

department              employees
0  id:123, name:HR      firstName: A, lastName: AA, email: mail1, salary: 100000

# Info is missing like 'id', 'name', 'firstName', 'lastName', 'email' etc. 
# For the complete expected example, see screenshow below. 

Expected Data format

Только для информации: я работаю в Databricks с Python.

Следовательно, как я могу записать свои данные (dframe из приведенного выше примера) без потери информации?

Заранее большое спасибо!

Edit Добавление изображения для Pault, чтобы показать формат CSV (и заголовки).

Edit2 Замена изображения, например, вывод csv:

После запуска кода Pault:

from pyspark.sql.functions import to_json
dframe.select(*[to_json(c).alias(c) for c in dframe.columns])\
    .repartition(1).write.csv("junk_mycsv.csv", header= True)

Вывод не аккуратный, так как большинство столбцов Заголовки пусты (из-за вложенного формата?). Копируется только первая строка:

department           employees              (empty ColName)     (empty ColName)   (and so on)
{\id\":\"123\"       \"name\":\"HR\"}"     [{\firstName\":\"A\"  \"lastName\":\"AA\"    (...)

1 Ответ

1 голос
/ 02 апреля 2020

Ваш фрейм данных имеет следующую схему:

dframe.printSchema()
#root
# |-- department: struct (nullable = true)
# |    |-- id: string (nullable = true)
# |    |-- name: string (nullable = true)
# |-- employees: array (nullable = true)
# |    |-- element: struct (containsNull = true)
# |    |    |-- firstName: string (nullable = true)
# |    |    |-- lastName: string (nullable = true)
# |    |    |-- email: string (nullable = true)
# |    |    |-- salary: long (nullable = true)

Таким образом, столбец department представляет собой StructType с двумя именованными полями, а столбец employees представляет собой массив структур с четырьмя именованными полями. Похоже, что вам нужно записать данные в формате, который сохраняет для каждой записи key и value.

Один из вариантов - записать файл в формате JSON вместо CSV :

dframe.write.json("junk.json")

, который производит следующий вывод:

{"department":{"id":"123","name":"HR"},"employees":[{"firstName":"A","lastName":"AA","email":"mail1","salary":100000},{"firstName":"B","lastName":"BB","email":"mail2","salary":120000},{"firstName":"E","lastName":"EE","email":"mail5","salary":160000}]}
{"department":{"id":"456","name":"OPS"},"employees":[{"firstName":"C","email":"mail3","salary":140000},{"firstName":"D","lastName":"DD","email":"mail4","salary":160000}]}

Или, если вы хотите сохранить его в формате CSV, вы можете использовать to_json для преобразования каждый столбец в JSON перед записью CSV.

# looping over all columns
# but you can also just limit this to the columns you want to convert

from pyspark.sql.functions import to_json
dframe.select(*[to_json(c).alias(c) for c in dframe.columns])\
    .write.csv("junk_mycsv.csv")

Это приводит к следующему выводу:

"{\"id\":\"123\",\"name\":\"HR\"}","[{\"firstName\":\"A\",\"lastName\":\"AA\",\"email\":\"mail1\",\"salary\":100000},{\"firstName\":\"B\",\"lastName\":\"BB\",\"email\":\"mail2\",\"salary\":120000},{\"firstName\":\"E\",\"lastName\":\"EE\",\"email\":\"mail5\",\"salary\":160000}]"
"{\"id\":\"456\",\"name\":\"OPS\"}","[{\"firstName\":\"C\",\"email\":\"mail3\",\"salary\":140000},{\"firstName\":\"D\",\"lastName\":\"DD\",\"email\":\"mail4\",\"salary\":160000}]"

Обратите внимание, что двойные кавычки экранированы.

...