Конвертировать несколько массивов столбцов структур в pyspark sql - PullRequest
1 голос
/ 28 октября 2019

У меня есть фрейм данных pyspark с несколькими столбцами (около 30) вложенных структур, которые я хочу записать в csv. (struct

Чтобы сделать это, я хочу привести в соответствие все столбцы структуры.

Я проверил несколько ответов здесь:

Pyspark, конвертирующий массив structв строку

PySpark: DataFrame - преобразовать структуру в массив

PySpark преобразовать поле структуры внутри массива в строку

Это структура моего фрейма данных (с около 30 комплексными ключами):

root  
 |-- 1_simple_key: string (nullable = true)  
 |-- 2_simple_key: string (nullable = true)  
 |-- 3_complex_key: struct (nullable = true)  
 |    |-- n1: string (nullable = true)  
 |    |-- n2: struct (nullable = true)  
 |    |    |-- n3: boolean (nullable = true)  
 |    |    |-- n4: boolean (nullable = true)  
 |    |    |-- n5: boolean (nullable = true)  
 |    |-- n6: long (nullable = true)  
 |    |-- n7: long (nullable = true)  
 |-- 4_complex_key: struct (nullable = true)  
 |    |-- n1: string (nullable = true)  
 |    |-- n2: struct (nullable = true)  
 |    |    |-- n3: boolean (nullable = true)  
 |    |    |-- n4: boolean (nullable = true)  
 |    |    |-- n5: boolean (nullable = true)  
 |    |-- n6: long (nullable = true)  
 |    |-- n7: long (nullable = true)  
 |-- 5_complex_key: struct (nullable = true)  
 |    |-- n1: string (nullable = true)  
 |    |-- n2: struct (nullable = true)  
 |    |    |-- n3: boolean (nullable = true)  
 |    |    |-- n4: boolean (nullable = true)  
 |    |    |-- n5: boolean (nullable = true)  
 |    |-- n6: long (nullable = true)  
 |    |-- n7: long (nullable = true)  

Предлагаемые решения для одного столбца, и я не могу принять его для нескольких столбцов.

Я хочу сделать что-то такого типа:
1. Для каждого struct_column:
2. col = stringify (struct_column)

Я не против создать для него дополнительный фрейм данных. Я простонеобходимо подготовить его для записи в формате csv.

Минимальный воспроизводимый пример:

from pyspark.sql import Row
d = d = {'1_complex_key': {0: Row(type='1_complex_key', s=Row(n1=False, n2=False, n3=True), x=954, y=238), 1: Row(type='1_complex_key', s=Row(n1=False, n2=False, n3=True), x=956, y=250), 2: Row(type='1_complex_key', s=Row(n1=True, n2=False, n3=False), x=886, y=269)}, '2_complex_key': {0: Row(type='2_complex_key', s=Row(n1=False, n2=False, n3=True), x=901, y=235), 1: Row(type='2_complex_key', s=Row(n1=False, n2=False, n3=True), x=905, y=249), 2: Row(type='2_complex_key', s=Row(n1=False, n2=False, n3=True), x=868, y=270)}, '3_complex_key': {0: Row(type='3_complex_key', s=Row(n1=True, n2=False, n3=False), x=925, y=197), 1: Row(type='3_complex_key', s=Row(n1=False, n2=False, n3=True), x=928, y=206), 2: Row(type='3_complex_key', s=Row(n1=False, n2=False, n3=True), x=883, y=236)}}
df = pd.DataFrame.from_dict(d)
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
s_df = spark.createDataFrame(df)
s_df.printSchema()
s_df.write.csv('it_doesnt_write.csv')

enter image description here enter image description here

Итак - подведем итог: у меня есть искровой фрейм данных, который я хочу записать в CSV. Я не могу записать его в CSV, потому что:

'CSV data source does not support struct<s:struct<n1:boolean,n2:boolean,n3:boolean>,type:string,x:bigint,y:bigint> data type.;'

Итак, я хочу выполнить некоторые действия / обратимые преобразования для этого кадра данных, чтобы я мог записать его в CSV, а затем прочитать его из CSV и сделать его искровым кадром данных с той же схемой.

Как я могу это сделать? Спасибо

1 Ответ

0 голосов
/ 03 ноября 2019

Поскольку pault уже упоминалось в комментариях, вам необходимо понимание списка. Такое понимание списка требует списка столбцов и функции, которая преобразует эти столбцы в строки. Я буду использовать df.columns и to_json , но вы также можете предоставить свой собственный список имен столбцов в Python и пользовательскую функцию для строкового преобразования ваших сложных столбцов.

#this converts all columns to json strings
#and writes it as to disk
s_df.select([F.to_json(x) for x in s_df.columns]).coalesce(1).write.csv('/tmp/testcsv')

В случае, если вы неЕсли вы хотите применить to_json ко всем столбцам, вы можете просто изменить его следующим образом:

list4tojson = ['2_complex_key', '3_complex_key']
s_df.select('1_complex_key', *[F.to_json(x) for x in list4tojson]).coalesce(1).write.csv('/tmp/testcsv')

Вы можете восстановить фрейм данных с помощью from_json :

df = spark.read.csv('/tmp/testcsv')
df.printSchema()
#root
# |-- _c0: string (nullable = true)
# |-- _c1: string (nullable = true)
# |-- _c2: string (nullable = true)

#interfering the schema
json_schema = spark.read.json(df.rdd.map(lambda row: row._c0)).schema

df.select([F.from_json(x, json_schema) for x in df.columns] ).printSchema()
#root
# |-- jsontostructs(_c0): struct (nullable = true)
# |    |-- s: struct (nullable = true)
# |    |    |-- n1: boolean (nullable = true)
# |    |    |-- n2: boolean (nullable = true)
# |    |    |-- n3: boolean (nullable = true)
# |    |-- type: string (nullable = true)
# |    |-- x: long (nullable = true)
# |    |-- y: long (nullable = true)
# |-- jsontostructs(_c1): struct (nullable = true)
# |    |-- s: struct (nullable = true)
# |    |    |-- n1: boolean (nullable = true)
# |    |    |-- n2: boolean (nullable = true)
# |    |    |-- n3: boolean (nullable = true)
# |    |-- type: string (nullable = true)
# |    |-- x: long (nullable = true)
# |    |-- y: long (nullable = true)
# |-- jsontostructs(_c2): struct (nullable = true)
# |    |-- s: struct (nullable = true)
# |    |    |-- n1: boolean (nullable = true)
# |    |    |-- n2: boolean (nullable = true)
# |    |    |-- n3: boolean (nullable = true)
# |    |-- type: string (nullable = true)
# |    |-- x: long (nullable = true)
# |    |-- y: long (nullable = true)

В случаевы просто хотите хранить свои данные в удобочитаемом формате, вы можете избежать всего вышеприведенного кода, написав его непосредственно в json:

s_df.coalesce(1).write.json('/tmp/testjson')

df = spark.read.json('/tmp/testjson')
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...