Прочитать Pyspark Struct Json Столбец Необязательные элементы - PullRequest
3 голосов
/ 08 июля 2020

У меня есть паркетный файл, в одном из столбцов которого находится поле структуры, в котором хранится json. Структура показана ниже.

originator: struct (nullable = true)
    |-- originatorDetail: struct (nullable = true)
    |    |-- applicationDeployedId: string (nullable = true)
    |    |-- applicationDeployedNameVersion: string (nullable = true)
    |    |-- applicationNameVersion: string (nullable = true)
    |    |-- cloudHost: string (nullable = true)
    |    |-- cloudRegion: string (nullable = true)
    |    |-- cloudStack: string (nullable = true)
    |    |-- version: string (nullable = true)
    |-- Orversion: string (nullable = true)

В json требуется только поле версии, а остальные поля не обязательны. Таким образом, некоторые записи могут иметь только 2 элемента и оставаться действительными.

Предположим, я хочу прочитать поле CloudHost. Я могу прочитать его как originator.originatorDetail.cloudHost. Но для записей, где этого необязательного поля нет. Он потерпит неудачу, поскольку элемента нет. Есть ли способ прочитать это необязательное значение как null для записей, в которых значения отсутствуют, без использования udf.

Некоторые примеры

originator": {
    "originatorDetail": {
      "applicationDeployedId": "PSLV",
      "cloudRegion": "Mangal",
      "cloudHost": "Petrol",
      "applicationNameVersion": "CRDI",
      "applicationDeployedNameVersion": "Tuna",
      "cloudStack": "DEV",
      "version": "1.1.0"
    },
    Orversion": "version.1"
  }
  -------------
 originator": {
    "originatorDetail": {
      "version": "1.1.0"
    },
    Orversion": "version.1"
  }

Требуемый вывод

applicationDeployedId applicationDeployedNameVersion  applicationNameVersion cloudHost cloudRegion cloudStack version  Orversion
 PSLV                   Tuna                            CRDI                   Petrol    Mangal       DEV       1.1.0    version.1
                                                                                                                1.1.0    version.1

1 Ответ

2 голосов
/ 08 июля 2020

Используйте функцию from_json из Spark-2.4 +

Прочтите данные паркета, затем используйте from_json, передав схема , которая соответствует вашему столбцу json.

Spark прочитает совпадающие данные и добавит несовпадающие поля с нулевыми значениями.

Example:

df.show(10,False)
#+---+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
#|id |json_data                                                                                                                                                                                                                                                      #|
#+---+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
#|1  |{"originator": {"originatorDetail": {"applicationDeployedId": "PSLV","cloudRegion": "Mangal","cloudHost": "Petrol","applicationNameVersion": "CRDI","applicationDeployedNameVersion": "Tuna","cloudStack": "DEV","version": "1.1.0"},"Orversion": "version.1"}}|
#|2  |{"originator": {    "originatorDetail": {      "version": "1.1.0"    },    "Orversion": "version.1"}}                                                                                                                                                          |
#+---+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
schema=StructType([StructField("originator",StructType([StructField("Orversion",StringType(),True),
            StructField("originatorDetail",StructType([StructField("applicationDeployedId",StringType(),True),
            StructField("applicationDeployedNameVersion",StringType(),True),
            StructField("applicationNameVersion",StringType(),True),
            StructField("cloudHost",StringType(),True),
            StructField("cloudRegion",StringType(),True),
            StructField("cloudStack",StringType(),True),
            StructField("version",StringType(),True)]),True)]),True)])

from pyspark.sql.functions import *
from pyspark.sql.types import *

#then read the json_data column using from_json function
df.withColumn("json_converted",from_json(col("json_data"),schema)).select("id","json_converted").show(10,False)
#+---+--------------------------------------------------------+
#|id |json_converted                                          |
#+---+--------------------------------------------------------+
#|1  |[[version.1, [PSLV, Tuna,, Petrol, Mangal, DEV, 1.1.0]]]|
#|2  |[[version.1, [,,,,,, 1.1.0]]]                           |
#+---+--------------------------------------------------------+

df.withColumn("json_converted",from_json(col("json_data"),schema)).select("id","json_converted").printSchema()
#root
# |-- id: long (nullable = true)
# |-- json_converted: struct (nullable = true)
# |    |-- originator: struct (nullable = true)
# |    |    |-- Orversion: string (nullable = true)
# |    |    |-- originatorDetail: struct (nullable = true)
# |    |    |    |-- applicationDeployedId: string (nullable = true)
# |    |    |    |-- applicationDeployedNameVersion: string (nullable = true)
# |    |    |    |-- applicationNameVersi: string (nullable = true)
# |    |    |    |-- cloudHost: string (nullable = true)
# |    |    |    |-- cloudRegion: string (nullable = true)
# |    |    |    |-- cloudStack: string (nullable = true)
# |    |    |    |-- version: string (nullable = true)

#even though we don't have all fields from id=2 still we added fields
df.withColumn("json_converted",from_json(col("json_data"),schema)).select("json_converted.originator.originatorDetail.applicationDeployedId").show(10,False)
#+---------------------+
#|applicationDeployedId|
#+---------------------+
#|PSLV                 |
#|null                 |
#+---------------------+
...