Читайте один столбец как строки json, а другой как обычный, используя фрейм данных pyspark - PullRequest
0 голосов
/ 20 июня 2019

У меня есть такой фрейм данных:

col1    | col2        |
-----------------------
test:1  | {"test1:subtest1":[{"Id":"17","cName":"c1"}], "test1:subtest2":[{"Id":"01","cName":"c2"}]}
test:2  | {"test1:subtest2":[{"Id":"18","cName":"c13","pScore":0.00203}]}

Мне нужен такой вывод:

col1   | col2           | Id | cName | pScore  |
------------------------------------------------
test:1 | test1:subtest1 | 17 | c1    | null    | 
test:1 | test1:subtest2 | 01 | c2    | null    | 
test:2 | test1:subtest2 | 18 | c13   | 0.00203 | 

Это продолжение этого вопроса - Приведение столбцак JSON / dict и выравниванию значений JSON в столбце в pyspark

Я новичок в pyspark и буду признателен за любую помощь в этом.Я попробовал решение, данное в этом посте.Это продолжало давать мне ошибки

TypeError: type object argument after ** must be a mapping, not list

Я также попробовал следующее:

test = sqlContext.read.json(df.rdd.map(lambda r: r.col2))

Но это дало мне вывод, подобный следующему:

 test1:subtest1      | test1:subtest2        |
----------------------------------------------
[{"Id":"17","cName":"c1"}] | [{"Id":"01","cName":"c2"}]
null                       | [{"Id":"18","cName":"c13","pScore":0.00203}]

Язастрял на том, как я могу использовать ^ выше, чтобы присоединиться к col1 и получить желаемый результат.

Любая помощь очень ценится, заранее спасибо !!

1 Ответ

1 голос
/ 21 июня 2019

Вы можете использовать функцию from_json () , ключом является определение json_schema, которую вы можете создать вручную, или, если вы используете pyspark 2.4+, вы можете использовать функцию schema_of_json () (приведенный ниже код протестирован в pyspark 2.4.0 ):

from pyspark.sql import functions as F

# define all keys with a list:
my_keys = ['test1:subtest1', 'test1:subtest2']

# find a sample json_code for a single key with all sub-fields and then construct its json_schema
key_schema = df.select(F.schema_of_json('{"test1:subtest1":[{"Id":"17","cName":"c1","pScore":0.00203}]}').alias('schema')).first().schema

>>> key_schema
u'struct<test1:subtest1:array<struct<Id:string,cName:string,pScore:double>>>'

# use the above sample key_schema to create the json_schema for all keys
schema = u'struct<' + ','.join([r'`{}`:array<struct<Id:string,cName:string,pScore:double>>'.format(k) for k in my_keys]) + r'>'

>>> schema 
u'struct<`test1:subtest1`:array<struct<Id:string,cName:string,pScore:double>>,`test1:subtest2`:array<struct<Id:string,cName:string,pScore:double>>>'

Примечание: необходимо добавлять кавычки для окружения имени поля, когда оно содержит специальные символыкак :.

После того, как у нас есть схема, данные json могут быть получены из col2:

df1 = df.withColumn('data', F.from_json('col2', schema)).select('col1', 'data.*')

>>> df1.printSchema()
root
 |-- col1: string (nullable = true)
 |-- test1:subtest1: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Id: string (nullable = true)
 |    |    |-- cName: string (nullable = true)
 |    |    |-- pScore: double (nullable = true)
 |-- test1:subtest2: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Id: string (nullable = true)
 |    |    |-- cName: string (nullable = true)
 |    |    |-- pScore: double (nullable = true)

>>> df1.show(2,0)
+------+--------------+--------------------+
|col1  |test1:subtest1|test1:subtest2      |
+------+--------------+--------------------+
|test:1|[[17, c1,]]   |[[01, c2,]]         |
|test:2|null          |[[18, c13, 0.00203]]|
+------+--------------+--------------------+

Затем вы можете использовать select иобъединение, чтобы нормализовать фрейм данных:

df_new = df1.select('col1', F.lit('test1:subtest1').alias('col2'), F.explode(F.col('test1:subtest1')).alias('arr')) \
            .union(
                df1.select('col1', F.lit('test1:subtest2'), F.explode(F.col('test1:subtest2')))
           ).select('col1', 'col2', 'arr.*')  

>>> df_new.show()
+------+--------------+---+-----+-------+
|  col1|          col2| Id|cName| pScore|
+------+--------------+---+-----+-------+
|test:1|test1:subtest1| 17|   c1|   null|
|test:1|test1:subtest2| 01|   c2|   null|
|test:2|test1:subtest2| 18|  c13|0.00203|
+------+--------------+---+-----+-------+

использовать limit ()

Когда в строках json много уникальных ключей, используйте функцию Reduce для создания df_new:

from functools import reduce     

df_new = reduce(lambda x,y: x.union(y)
          , [ df1.select('col1', F.lit(k).alias('col2'), F.explode(F.col(k)).alias('arr')) for k in my_keys ]
         ).select('col1', 'col2', 'arr.*')
...