как преобразовать pyspark rdd в Dataframe - PullRequest
0 голосов
/ 13 июля 2020

У меня есть df фрейма данных, как показано ниже

df =

+---+---+----+---+---+
|  a|  b|   c|  d|  e|
+---+---+----+---+---+
|  1|  a|foo1|  4|  5|
|  2|  b| bar|  4|  6|
|  3|  c| mnc|  4|  7|
|  4|  c| mnc|  4|  7|
+---+---+----+---+---+

Я хочу добиться чего-то вроде df1 =

+---+---+-----------------------------------------------+
|  a|  b|   c                                           |
+---+---+-----------------------------------------------+
|  1|  a|{'a': 1, 'b': 'a', 'c': 'foo1', 'd': 4, 'e': 5}|                            
|  2|  b|{'a': 2, 'b': 'b', 'c': 'bar', 'd': 4, 'e': 6} |                                       
|  3|  c|{'a': 3, 'b': 'c', 'c': 'mnc', 'd': 4, 'e': 7} |                                       
|  4|  c|{'a': 4, 'b': 'c', 'c': 'mnc', 'd': 4, 'e': 7} |                                       
+---+---+-----------------------------------------------+

Я действительно хотел избежать group by, поэтому я подумал сначала преобразовать фрейм данных в rdd и снова преобразовать в них один фрейм данных

Фрагмент кода, который я написал, был

df2=df.rdd.flatMap(lambda x:(x.a,x.b,x.asDict()))

при выполнении foreach на df2, я получаю результат в формате rdd Итак, я попытался создать из него фрейм данных.

df3=df2.toDF() #1st way
df3=sparkSession.createDataframe(df2) #2nd way

Но я получаю ошибку в обоих направлениях. Может кто-нибудь объяснить, что я здесь делаю не так и как добиться своего восстановления

Ответы [ 2 ]

3 голосов
/ 13 июля 2020

Может быть выполнено с помощью искры sql, как показано ниже:

Искра SQL

data.createOrReplaceTempView("data")
spark.sql("""
select a, b, to_json(named_struct('a',a, 'b',b,'c',c,'d',d,'e',e)) as c
from data""").show(20,False)

Выход

# +---+---+----------------------------------------+
# |a  |b  |c                                       |
# +---+---+----------------------------------------+
# |1  |a  |{"a":1,"b":"a","c":"foo1","d":"4","e":5}|
# |2  |b  |{"a":2,"b":"b","c":"bar","d":"4","e":6} |
# |3  |c  |{"a":3,"b":"c","c":"mnc","d":"4","e":7} |
# |4  |c  |{"a":4,"b":"c","c":"mnc","d":"4","e":7} |
# +---+---+----------------------------------------+

Datframe API

result = data\
 .withColumn('c',to_json(struct(data.a,data.b,data.c,data.d,data.e)))\
 .select("a","b","c")
result.show(20,False)

Вывод

# +---+---+----------------------------------------+
# |a  |b  |c                                       |
# +---+---+----------------------------------------+
# |1  |a  |{"a":1,"b":"a","c":"foo1","d":"4","e":5}|
# |2  |b  |{"a":2,"b":"b","c":"bar","d":"4","e":6} |
# |3  |c  |{"a":3,"b":"c","c":"mnc","d":"4","e":7} |
# |4  |c  |{"a":4,"b":"c","c":"mnc","d":"4","e":7} |
# +---+---+----------------------------------------+
0 голосов
/ 13 июля 2020

Вы можете создать столбец json из столбца типа карты

import pyspark.sql.functions as F
df = sqlContext.createDataFrame(
    [(0, 1, 23, 4, 8, 9, 5, "b1"), (1, 2, 43, 8, 10, 20, 43, "e1")], 
    ("id", "a1", "b1", "c1", "d1", "e1", "f1", "ref")
)
tst = [[F.lit(c),F.col(c)] for c in df.columns]
tst_flat =[item for sublist in tst for item in sublist]
#%%

map_coln = F.create_map(*tst_flat)

df1=df.withColumn("out",F.to_json(map_coln))

результат:

In [37]: df1.show(truncate=False)
+---+---+---+---+---+---+---+---+-------------------------------------------------------------------------------+
|id |a1 |b1 |c1 |d1 |e1 |f1 |ref|out                                                                            |
+---+---+---+---+---+---+---+---+-------------------------------------------------------------------------------+
|0  |1  |23 |4  |8  |9  |5  |b1 |{"id":"0","a1":"1","b1":"23","c1":"4","d1":"8","e1":"9","f1":"5","ref":"b1"}   |
|1  |2  |43 |8  |10 |20 |43 |e1 |{"id":"1","a1":"2","b1":"43","c1":"8","d1":"10","e1":"20","f1":"43","ref":"e1"}|
+---+---+---+---+---+---+---+---+-------------------------------------------------------------------------------+
...