Разделить строку в кадре PySpark - PullRequest
0 голосов
/ 03 декабря 2018

Я хочу разбить столбец в фрейме данных PySpark, столбец (тип строки) выглядит следующим образом:

[{"quantity":25,"type":"coins","balance":35}]
[{"balance":40,"type":"coins","quantity":25}]
[{"quantity":2,"type":"column_breaker","balance":2},{"quantity":2,"type":"row_breaker","balance":2},{"quantity":2,"type":"single_block_breaker","balance":2},{"quantity":1,"type":"rainbow","balance":1},{"quantity":135,"type":"coins","balance":140}]

Таким образом, некоторые из них имеют набор "quantity, type, balance", а некоторые из нихесть несколько таких записей.Я попытался обработать его как переменную JSON и разделить:

schema = StructType(
[
    StructField('balance', StringType(), True),
    StructField('type', StringType(), True),
    StructField('quantity', StringType(), True)
 ]
 )

temp = merger.withColumn("data", 
from_json("items",schema)).select("items", col('data.*'))
display(temp)

Но он мог разделить наблюдения только одним набором.Я хотел бы получить вывод, подобный

balance|quantity|type
   35  |   25   |coins
   40  |   25   |coins
.......

, чтобы наблюдения с одним набором делились на одно наблюдение, а наблюдения с несколькими наборами - на несколько наблюдений с вертикальным расположением.

Кроме того, как я могу выделить каждое наблюдение после разделения на несколько строк?Скажем, у меня есть другая переменная с идентификатором, как я могу присвоить идентификатор обратно?

Ответы [ 2 ]

0 голосов
/ 04 декабря 2018

Вы можете использовать библиотеку json и использовать rdd.flatMap () для разбора и разбиения массива строк json на несколько строк

import json

data = [("[{\"quantity\":25,\"type\":\"coins\",\"balance\":35}]",),
         ("[{\"balance\":40,\"type\":\"coins\",\"quantity\":25}]",),
    ("[{\"quantity\":2,\"type\":\"column_breaker\",\"balance\":2},{\"quantity\":2,\"type\":\"row_breaker\",\"balance\":2},{\"quantity\":2,\"type\":\"single_block_breaker\",\"balance\":2},{\"quantity\":1,\"type\":\"rainbow\",\"balance\":1},{\"quantity\":135,\"type\":\"coins\",\"balance\":140}]",)]

schema = StructType([StructField("items", StringType(), True)])
df = spark.createDataFrame(data,schema)

def transformRow(row):
    jsonObj = json.loads(row[0])
    rows = [Row(**item) for item in jsonObj]
    return rows

df.rdd.flatMap(transformRow).toDF().show()

output

+-------+--------+--------------------+
|balance|quantity|                type|
+-------+--------+--------------------+
|     35|      25|               coins|
|     40|      25|               coins|
|      2|       2|      column_breaker|
|      2|       2|         row_breaker|
|      2|       2|single_block_breaker|
|      1|       1|             rainbow|
|    140|     135|               coins|
+-------+--------+--------------------+
0 голосов
/ 03 декабря 2018

Если у вас есть несколько JSON с каждой строкой, вы можете использовать трюк для замены запятой между объектами на новую строку и разделения на новую строку с помощью функции explode.Так что для DF вот так:

>>> df.show()
+-----------------+
|            items|
+-----------------+
|         {"a": 1}|
|{"a": 2},{"a": 3}|
+-----------------+

этот код выполняет свою работу:

>>> from pyspark.sql.types import ArrayType, StringType
>>> from pyspark.sql.functions import udf, explode
>>> split_jsons = lambda jsons: jsons.replace('},{', '}\n{').split('\n')
>>> df.withColumn('one_json_per_row', udf(split_jsons, ArrayType(StringType()))('items')) \
...    .select(explode('one_json_per_row').alias('item')).show()
+--------+
|    item|
+--------+
|{"a": 1}|
|{"a": 2}|
|{"a": 3}|
+--------+

Тогда вы можете использовать обычный код

...