Использование Pyspark для чтения элементов JSON из массива? - PullRequest
0 голосов
/ 13 мая 2019

У меня есть некоторые проблемы с чтением элементов из Cosmos DB в блоках данных, кажется, что JSON читается как строковое значение, и возникают некоторые проблемы при выводе данных из него в столбцы.

У меня есть столбец с именем ProductRanges со следующими значениями в строке:

[   {
        "name": "Red",
        "min": 0,
        "max": 99,
        "value": "Order More"
    },
    {
        "name": "Amber",
        "min": 100,
        "max": 499,
        "value": "Stock OK"
    },
    {
        "name": "Green",
        "min": 500,
        "max": 1000000,
        "value": "Overstocked"
    }
]

В Cosmos DB документ JSON действителен, при импорте данных тип данных в кадре данных является строкой, а не объектом / структурой JSON, как я ожидал.

Мне бы хотелось, чтобы можно было подсчитать, сколько раз появляется «имя» и перебирать их, получая элементы min, max и value, так как количество диапазонов, которые мы можем иметь, может быть больше 3. Я побывал в нескольких постах на stackoverflow и других местах, но застрял на форматировании. Я попытался использовать разнесение и прочитать схему, основываясь на значениях столбца, но в ней говорится «в документе vaild», думаю, это может быть связано с тем, что Pyspark нужен {} в начале и в конце, но даже конкатенирует его в SQL-запрос из базы данных cosmos по-прежнему заканчивается типом строки.

Любые указатели будут оценены

Ответы [ 2 ]

1 голос
/ 14 мая 2019

Я вижу, что вы извлекли документы JSON из Azure CosmosDB и преобразовали их в PySpark DataFrame, но вложенный документ или массив JSON не удалось преобразовать как объект JSON в столбце DataFrame, как вы ожидали, поскольку не определен тип JSONв модуле pyspark.sql.types, как показано ниже.

enter image description here

Я искал документ PySpark: Convert JSON String Column to Array of Object (StructType) in Data Frame, который подойдет для вашего решения.текущий случай, даже такой, как вы хотите, пока я пытался его решить.

В приведенном выше документе показано, как использовать ArrayType, StructType, StructField и другие базовые типы данных PySpark для преобразованияСтрока JSON в столбце с комбинированным типом данных, который может быть легче обработан в PySpark с помощью определения схемы столбца и UDF.

Вот сводка примера кода.Надеюсь, это поможет.

source = [{"attr_1": 1, "attr_2": "[{\"a\":1,\"b\":1},{\"a\":2,\"b\":2}]"}, {"attr_1": 2, "attr_2": "[{\"a\":3,\"b\":3},{\"a\":4,\"b\":4}]"}]

JSON считывается во фрейм данных через sqlContext.Вывод:

+------+--------------------+

|attr_1|              attr_2|

+------+--------------------+

|     1|[{"a":1,"b":1},{"...|

|     2|[{"a":3,"b":3},{"...|

+------+--------------------+


root
  |-- attr_1: long (nullable = true)
  |-- attr_2: string (nullable = true)

Затем, чтобы преобразовать столбец attr_2 через схему определения столбца и UDF.

# Function to convert JSON array string to a list
import json

def parse_json(array_str):
    json_obj = json.loads(array_str)
    for item in json_obj:
        yield (item["a"], item["b"])

# Define the schema
from pyspark.sql.types import ArrayType, IntegerType, StructType, StructField

json_schema = ArrayType(StructType([StructField('a', IntegerType(
), nullable=False), StructField('b', IntegerType(), nullable=False)]))

# Define udf
from pyspark.sql.functions import udf

udf_parse_json = udf(lambda str: parse_json(str), json_schema)

# Generate a new data frame with the expected schema

df_new = df.select(df.attr_1, udf_parse_json(df.attr_2).alias("attr_2"))
df_new.show()
df_new.printSchema()

Вывод выглядит следующим образом:

+------+--------------+

|attr_1|        attr_2|

+------+--------------+

|     1|[[1,1], [2,2]]|

|     2|[[3,3], [4,4]]|

+------+--------------+


root
  |-- attr_1: long (nullable = true)
  |-- attr_2: array (nullable = true)
  |    |-- element: struct (containsNull = true)
  |    |    |-- a: integer (nullable = false)
  |    |    |-- b: integer (nullable = false)
0 голосов
/ 14 мая 2019

Исходя из данных json, вы можете просмотреть схему вашего фрейма данных с помощью printSchema и использовать ее Рассмотрим пример ниже:

{"Id":11,"data":[{"package":"com.browser1","activetime":60000},{"package":"com.browser6","activetime":1205000},{"package":"com.browser7","activetime":1205000}]}
{"Id":12,"data":[{"package":"com.browser1","activetime":60000},{"package":"com.browser6","activetime":1205000}]} 
......

appActiveTime.printSchema()
root
 |-- data: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- activetime: long (nullable = true)
 |    |    |-- package: string (nullable = true)

Поскольку у вас есть массив, вам нужно разбить данные и выбрать поле структуры, как показано ниже

import org.apache.spark.sql.functions._
appActiveTime.withColumn("data", explode($"data"))
       .select("data.*")
       .show(false)

Вывод будет выглядеть так:

+----------+------------+
|activetime|     package|
+----------+------------+
|     60000|com.browser1|
|   1205000|com.browser6|
|   1205000|com.browser7|
|     60000|com.browser1|
|   1205000|com.browser6|
+----------+------------+

Надеюсь, это поможет.

...