Моя задача состоит в том, чтобы обновить фрейм данных spark, имеющий столбец типа string и struct. Я пытаюсь сделать в scala - PullRequest
0 голосов
/ 02 апреля 2020
Id         Identifers
'123'      {"country":"PR", "idType":"SELECTED","status":"Not Done"}
'234'      {"country":"PR", "idType":"NOT SELECTED","status":"Not Done"}

Необходимо изменить $Identifers.status = "Done", когда idType равно "SELECTED"

Так ожидается OutPut будет

Id         Identifers
'123'      {"country":"PR", "idType":"SELECTED","status":"Not Done"}
'234'      {"country":"PR", "idType":"NOT SELECTED","status":"Done"}

Я пытался использовать

df.withColumn("$NewIdentifers", when($"Identifers.idType" === "SELECTED", "DONE"))

Но это не помогло, давая Null

1 Ответ

0 голосов
/ 02 апреля 2020

Начиная с Spark-2.2 :

Вы можете использовать from_json (для создания отдельных столбцов) и to_json ( для этого случая воссоздайте json объект) во встроенных функциях.

Example:

//sample data
df.show(false)
//+---+-------------------------------------------------------------+
//|Id |Identifiers                                                  |
//+---+-------------------------------------------------------------+
//|123|{"country":"PR", "idType":"SELECTED","status":"Not Done"}    |
//|234|{"country":"PR", "idType":"NOT SELECTED","status":"Not Done"}|
//+---+-------------------------------------------------------------+

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

//defining the schema
val sch=new StructType().add("country",StringType).add("idType",StringType).add("status",StringType)

//read the identifiers using from_json and pass the schema
val df1=df.withColumn("jsn",from_json(col("Identifiers"),sch)).select("Id","jsn.*")

//required json cols
val jsn_cols=df1.columns.filter(_.toLowerCase != "id")

//here we are using when otherwise and updating status column then recreating json object using to_json function

df1.withColumn("status",when(col("idType") === "SELECTED",lit("Done")).otherwise(col("status"))).
withColumn("identifiers",to_json(struct(jsn_cols.head,jsn_cols.tail:_*))).
drop(jsn_cols:_*).
show(false)

//+---+------------------------------------------------------------+
//|Id |identifiers                                                 |
//+---+------------------------------------------------------------+
//|123|{"country":"PR","idType":"SELECTED","status":"Done"}        |
//|234|{"country":"PR","idType":"NOT SELECTED","status":"Not Done"}|
//+---+------------------------------------------------------------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...