Transactions
- это тип массива, и вы получаете доступ к TransactionId
и OrgTypeToExclude
, поэтому вы получите несколько массивов.
Вместо этого вы просто взорвите массив root level Transactions
и извлеките структурные значения OrgTypeToExclude
и TransactionId
, следующие шаги будут легкими.
Пожалуйста, проверьте код ниже .
scala> val jsonstr ="""{
|
| "id": "3b4219f8-0579-4933-ba5e-c0fc532eeb2a",
| "Transactions": [
| {
| "TransactionId": "USAL",
| "OrgTypeToExclude": ["A","B"]
| },
| {
| "TransactionId": "USMD",
| "OrgTypeToExclude": ["E"]
| },
| {
| "TransactionId": "USGA",
| "OrgTypeToExclude": []
| }
| ]
| }"""
jsonstr: String =
{
"id": "3b4219f8-0579-4933-ba5e-c0fc532eeb2a",
"Transactions": [
{
"TransactionId": "USAL",
"OrgTypeToExclude": ["A","B"]
},
{
"TransactionId": "USMD",
"OrgTypeToExclude": ["E"]
},
{
"TransactionId": "USGA",
"OrgTypeToExclude": []
}
]
}
scala> val df = Seq((1, "USAL","A"),(4, "USAL","C"), (2, "USMD","B"),(5, "USMD","E"), (3, "USGA","C")).toDF("id", "code","Alp")
df: org.apache.spark.sql.DataFrame = [id: int, code: string ... 1 more field]
scala> val json = spark.read.json(Seq(jsonstr).toDS).select(explode($"Transactions").as("Transactions")).select($"Transactions.*")
json: org.apache.spark.sql.DataFrame = [OrgTypeToExclude: array<string>, TransactionId: string]
scala> df.show(false)
+---+----+---+
|id |code|Alp|
+---+----+---+
|1 |USAL|A |
|4 |USAL|C |
|2 |USMD|B |
|5 |USMD|E |
|3 |USGA|C |
+---+----+---+
scala> json.show(false)
+----------------+-------------+
|OrgTypeToExclude|TransactionId|
+----------------+-------------+
|[A, B] |USAL |
|[E] |USMD |
|[] |USGA |
+----------------+-------------+
scala> df.join(jsondf,(df("code") === jsondf("TransactionId") && !array_contains(jsondf("OrgTypeToExclude"),df("Alp"))),"inner").select("id","code","alp").show(false)
+---+----+---+
|id |code|alp|
+---+----+---+
|4 |USAL|C |
|2 |USMD|B |
|3 |USGA|C |
+---+----+---+
scala>