Spark: условие соединения с массивом (допускающие значение NULL) - PullRequest
0 голосов
/ 08 мая 2020

У меня есть 2 фрейма данных, и я хотел бы присоединиться к ним и отфильтровать данные, я хочу отфильтровать данные
, где OrgTypeToExclude совпадает с каждым идентификатором транзакции.

одним словом my transactionId - это условие соединения, а OrgTypeToExclude - это условие исключения, здесь приводится простой пример

import org.apache.spark.sql.functions.expr
import spark.implicits._
val jsonstr ="""{

  "id": "3b4219f8-0579-4933-ba5e-c0fc532eeb2a",
  "Transactions": [
    {
      "TransactionId": "USAL",
      "OrgTypeToExclude": ["A","B"]
    },
    {
      "TransactionId": "USMD",
      "OrgTypeToExclude": ["E"]
    },
    {
      "TransactionId": "USGA",
      "OrgTypeToExclude": []
    }
    ]   
}"""
val df = Seq((1, "USAL","A"),(4, "USAL","C"), (2, "USMD","B"),(5, "USMD","E"), (3, "USGA","C")).toDF("id", "code","Alp")
val json = spark.read.json(Seq(jsonstr).toDS).select("Transactions.TransactionId","Transactions.OrgTypeToExclude")

df.printSchema()
json.printSchema()
df.join(json,$"code"<=> $"TransactionId".cast("string") && !exp("array_contains(OrgTypeToExclude, Alp)") ,"inner" ).show()

  --Expecting output
 id  Code    Alp 
 4   "USAL"  "C"
 2   "USMD"  "B"
 3   "USGA"  "C"

Спасибо, Манодж.

Ответы [ 2 ]

1 голос
/ 08 мая 2020

Transactions - это тип массива, и вы получаете доступ к TransactionId и OrgTypeToExclude, поэтому вы получите несколько массивов.

Вместо этого вы просто взорвите массив root level Transactions и извлеките структурные значения OrgTypeToExclude и TransactionId, следующие шаги будут легкими.

Пожалуйста, проверьте код ниже .


scala> val jsonstr ="""{
     |
     |   "id": "3b4219f8-0579-4933-ba5e-c0fc532eeb2a",
     |   "Transactions": [
     |     {
     |       "TransactionId": "USAL",
     |       "OrgTypeToExclude": ["A","B"]
     |     },
     |     {
     |       "TransactionId": "USMD",
     |       "OrgTypeToExclude": ["E"]
     |     },
     |     {
     |       "TransactionId": "USGA",
     |       "OrgTypeToExclude": []
     |     }
     |     ]
     | }"""
jsonstr: String =
{

  "id": "3b4219f8-0579-4933-ba5e-c0fc532eeb2a",
  "Transactions": [
    {
      "TransactionId": "USAL",
      "OrgTypeToExclude": ["A","B"]
    },
    {
      "TransactionId": "USMD",
      "OrgTypeToExclude": ["E"]
    },
    {
      "TransactionId": "USGA",
      "OrgTypeToExclude": []
    }
    ]
}

scala> val df = Seq((1, "USAL","A"),(4, "USAL","C"), (2, "USMD","B"),(5, "USMD","E"), (3, "USGA","C")).toDF("id", "code","Alp")
df: org.apache.spark.sql.DataFrame = [id: int, code: string ... 1 more field]

scala> val json = spark.read.json(Seq(jsonstr).toDS).select(explode($"Transactions").as("Transactions")).select($"Transactions.*")
json: org.apache.spark.sql.DataFrame = [OrgTypeToExclude: array<string>, TransactionId: string]

scala> df.show(false)
+---+----+---+
|id |code|Alp|
+---+----+---+
|1  |USAL|A  |
|4  |USAL|C  |
|2  |USMD|B  |
|5  |USMD|E  |
|3  |USGA|C  |
+---+----+---+


scala> json.show(false)
+----------------+-------------+
|OrgTypeToExclude|TransactionId|
+----------------+-------------+
|[A, B]          |USAL         |
|[E]             |USMD         |
|[]              |USGA         |
+----------------+-------------+


scala> df.join(jsondf,(df("code") === jsondf("TransactionId") && !array_contains(jsondf("OrgTypeToExclude"),df("Alp"))),"inner").select("id","code","alp").show(false)
+---+----+---+
|id |code|alp|
+---+----+---+
|4  |USAL|C  |
|2  |USMD|B  |
|3  |USGA|C  |
+---+----+---+


scala>

0 голосов
/ 08 мая 2020

Во-первых, похоже, вы упустили из виду тот факт, что транзакции также являются массивом, с которым мы можем использовать explode для обработки:

val json = spark.read.json(Seq(jsonstr).toDS)
  .select(explode($"Transactions").as("t")) // deal with Transactions array first
  .select($"t.TransactionId", $"t.OrgTypeToExclude")

Кроме того, array_contains хочет значение, а не столбец в качестве второго аргумент. Мне неизвестна версия, которая поддерживает ссылки на столбец, поэтому мы создадим udf:

val arr_con = udf { (a: Seq[String], v: String) => a.contains(v) }

Затем мы можем изменить условие соединения следующим образом:

df.join(json0, $"code" <=> $"TransactionId" && ! arr_con($"OrgTypeToExclude", $"Alp"), "inner").show()

И ожидаемый результат:

scala> df.join(json, $"code" <=> $"TransactionId" && ! arr_con($"OrgTypeToExclude", $"Alp"), "inner").show()
+---+----+---+-------------+----------------+
| id|code|Alp|TransactionId|OrgTypeToExclude|
+---+----+---+-------------+----------------+
|  4|USAL|  C|         USAL|          [A, B]|
|  2|USMD|  B|         USMD|             [E]|
|  3|USGA|  C|         USGA|              []|
+---+----+---+-------------+----------------+
...