Spark конвертирует тип поля столбца, который находится в Json, в несколько строк или вложенных строк - PullRequest
0 голосов
/ 03 октября 2018

У меня есть json, как показано ниже, это всего лишь один фрагмент данных.Таким образом, фактический сжатый JSON имеет много, если данные такого типа

 {
    "filed1": "value1",
    "filed2": "value2",
    "data":"{\"info\":[{\"type\":[\"Extra\"],\"value\":9},{\"type\":[\"Free\"],\"value\":8},{\"type\":[\"Actual\"],\"value\":100}]}",
    "code": "0000"
}
{
    "filed1": "value3",
    "filed2": "value4",
    "data":"{\"info\":[{\"type\":[\"Extra\"],\"value\":1001}]}",
    "code": "0001"
}

{
    "filed1": "value5",
    "filed2": "value6",
    "data":"{\"info\":[{\"type\":[\"Actual\"],\"value\":90},{\"type\":[\"Free\"],\"value\":80}]}",
    "code": "0003"
}

, когда я читаю это в искре, столбец данных читается как String, поэтому мне нужно проанализировать и сделать столбцы, как показано ниже, Здесь каждая строканужно преобразовать в несколько строк

filed1   filed2  code  type    Value
value1   value2  0000  Extra   9
value1   value2  0000  Free    8
value1   value2  0000  Actual  100
value3   value4  0001  Extra   1001
value5   value6  0003  Actual  90
value5   value6  0003  Free    80

Я написал ниже udfs, но я не знаю, как создать несколько строк для одной введенной строки

val getTypeName = udf((strs:String) => {
 // parse json and return types
  })

val getValue = udf((strs:String) => {
 // parse json and return values
  })

val df = spark.read.json("<pathtojson">)
val df1 = df.withColumn("type", getTypeName("data")).withColumn("value", getValue("data"))

, но с помощью логики я могу получитьтолько одна строка, я хочу, чтобы это преобразовало два числа строк в соответствии с моими полями данных

1 Ответ

0 голосов
/ 03 октября 2018

Функция, которую вы ищете, называется explode.По сути, вы хотите написать UDF, которая анализирует ваш JSON и выдает массив вложенных строк.Затем вы звоните взорвать на этой колонке.Explode возьмет столбец с несколькими значениями и создаст новую строку для каждого значения (дублируя значения в других столбцах).Например:

case class DataRow(filed1: String, filed2: String, data: String, code: String)

val df = Seq(
    DataRow(
        "value1",
        "value2",
        "{\"info\":[{\"type\":[\"Extra\"],\"value\":9},{\"type\":[\"Free\"],\"value\":8},{\"type\":[\"Actual\"],\"value\":100}]}",
        "0000"
    ),
    DataRow(
        "value3",
        "value4",
        "{\"info\":[{\"type\":[\"Extra\"],\"value\":1001}]}",
        "0001"
    )
).toDF 

case class NestedRow(row_type: String, value: Int)
def processJsonFn(json: String): Seq[NestedRow] = {
    // ... Parse json ...
    val parsed = Seq(NestedRow("Extra", 9), NestedRow("Actual", 100))

    parsed
}
val processJson = udf(processJsonFn _)

// Convert string json to nested rows
val df2 = df.withColumn("data", processJson($"data"))

// Explode them
val df3 = df2.withColumn("data", explode($"data"))

// Flatten structure
val df4 = df3.select($"filed1", $"filed2", $"data.row_type" as "type", $"data.value" as "value", $"code")

df4.printSchema
df4.show

Выводит это:

root
 |-- filed1: string (nullable = true)
 |-- filed2: string (nullable = true)
 |-- type: string (nullable = true)
 |-- value: integer (nullable = true)
 |-- code: string (nullable = true)


scala> df4.show
+------+------+------+-----+----+
|filed1|filed2|  type|value|code|
+------+------+------+-----+----+
|value1|value2| Extra|    9|0000|
|value1|value2|Actual|  100|0000|
|value3|value4| Extra|    9|0001|
|value3|value4|Actual|  100|0001|
+------+------+------+-----+----+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...