Приведение столбца к JSON / dict и выравнивание значений JSON в столбце в pyspark - PullRequest
0 голосов
/ 01 июня 2019

Я новичок в Pyspark и выясняю, как привести тип столбца к типу dict, а затем выровнять этот столбец по нескольким столбцам, используя explode.

Вот как выглядит мой фрейм данных:

   col1    | col2        |
    -----------------------
    test:1  | {"test1":[{"Id":"17","cName":"c1"},{"Id":"01","cName":"c2","pScore":0.003609}],
{"test8":[{"Id":"1","cName":"c11","pScore":0.0},{"Id":"012","cName":"c2","pScore":0.003609}]
    test:2  | {"test1:subtest2":[{"Id":"18","cName":"c13","pScore":0.00203}]}

В настоящий момент схема этого кадра данных имеет вид

root
 |-- col1: string (nullable = true)
 |-- col2: string (nullable = true)

Вывод, который я хочу получить, выглядит следующим образом:

col1   | col2           | Id | cName | pScore  |
------------------------------------------------
test:1 | test1          | 17 | c1    | null    | 
test:1 | test1          | 01 | c2    | 0.003609|
test:1 | test8          | 1  | c11   | 0.0     |
test:1 | test8          | 012| c2    | 0.003609|
test:2 | test1:subtest2 | 18 | c13   | 0.00203 | 

У меня проблемы с определениемправильная схема для col2 для приведения ее типа от String до json или dict.И затем я хотел бы иметь возможность разбивать значения на несколько столбцов, как показано выше.Любая помощь будет принята с благодарностью.Я использую Spark 2.0 +.

Спасибо!

Ответы [ 2 ]

1 голос
/ 02 июня 2019

Обновляя свой ответ, я использовал udf, чтобы поместить ключ в массив, а затем взорваться, чтобы достичь желаемого результата

См. Пример ниже:

import json
import re

import pyspark.sql.functions as f
from pyspark.shell import spark
from pyspark.sql.types import ArrayType, StructType, StructField, StringType, DoubleType

df = spark.createDataFrame([
    ('test:1',
     '{"test1":[{"Id":"17","cName":"c1"},{"Id":"01","cName":"c2","pScore":0.003609}]},'
     '{"test8":[{"Id":"1","cName":"c11","pScore":0.0},{"Id":"012","cName":"c2","pScore":0.003609}]}'),
    ('test:2', '{"test1:subtest2":[{"Id":"18","cName":"c13","pScore":0.00203}]}')
], ['col1', 'col2'])

schema = ArrayType(
    StructType(
        [
            StructField("Col", StringType()),
            StructField("Id", StringType()),
            StructField("cName", StringType()),
            StructField("pScore", DoubleType())
        ]
    )
)


@f.udf(returnType=schema)
def parse_col(column):
    updated_values = []

    for it in re.finditer(r'{.*?}]}', column):
        parse = json.loads(it.group())
        for key, values in parse.items():
            for value in values:
                value['Col'] = key
                updated_values.append(value)

    return updated_values


df = df \
    .withColumn('tmp', parse_col(f.col('col2'))) \
    .withColumn('tmp', f.explode(f.col('tmp'))) \
    .select(f.col('col1'),
            f.col('tmp').Col.alias('col2'),
            f.col('tmp').Id.alias('Id'),
            f.col('tmp').cName.alias('cName'),
            f.col('tmp').pScore.alias('pScore'))

df.show()

Вывод:

+------+--------------+---+-----+--------+
|  col1|          col2| Id|cName|  pScore|
+------+--------------+---+-----+--------+
|test:1|         test1| 17|   c1|    null|
|test:1|         test1| 01|   c2|0.003609|
|test:1|         test8|  1|  c11|     0.0|
|test:1|         test8|012|   c2|0.003609|
|test:2|test1:subtest2| 18|  c13| 0.00203|
+------+--------------+---+-----+--------+
1 голос
/ 01 июня 2019

Поскольку разные имена ключей для каждой строки в JSON, определение общей схемы для json не будет работать хорошо, я считаю, что лучше обрабатывать это с помощью UDF:

import pyspark.sql.functions as f
import pyspark.sql.types as t
from pyspark.sql import Row
import json

def extract_key(dumped_json):
    """
    Extracts the single key from the dumped json (as a string).
    """
    if dumped_json is None:
        return None

    d = json.loads(dumped_json)

    try:
        return list(d.keys())[0]
    except IndexError:
        return None

def extract_values(dumped_json):
    """
    Extracts the single array value from the dumped json and parses each element
    of the array as a spark Row.
    """
    if dumped_json is None:
        return None

    d = json.loads(dumped_json)
    try:
        return [Row(**_d) for _d in list(d.values())[0]]
    except IndexError:
        return None

# Definition of the output type of the `extract_values` function
output_values_type = t.ArrayType(t.StructType(
 [t.StructField("Id", t.StringType()), 
  t.StructField("cName", t.StringType()), 
  t.StructField("pScore", t.DoubleType())]
))

# Define UDFs
extract_key_udf = f.udf(extract_key, t.StringType())
extract_values_udf = f.udf(extract_values, output_values_type)

# Extract values and keys
extracted_df = df.withColumn("values", extract_values_udf("col2")). \
 withColumn("col2", extract_key_udf("col2"))
# Explode the array
exploded_df = extracted_df.withColumn("values", f.explode("values"))

# Select the wanted columns
final_df = exploded_df.select("col1", "col2", "values.Id", "values.cName", 
                              "values.pScore")

результат, как и хотелось:

+------+--------------+---+-----+--------+
|col1  |col2          |Id |cName|pScore  |
+------+--------------+---+-----+--------+
|test:1|test1:subtest1|17 |c1   |0.002034|
|test:1|test1:subtest1|01 |c2   |0.003609|
|test:2|test1:subtest2|18 |c13  |0.00203 |
+------+--------------+---+-----+--------+
...