У меня есть два набора данных AccountData и CustomerData , с соответствующими классами дел:
case class AccountData(customerId: String, forename: String, surname: String)
customerId|accountId|balance|
+----------+---------+-------+
| IND0002| ACC0002| 200|
| IND0002| ACC0022| 300|
| IND0003| ACC0003| 400|
+----------+---------+-------+
case class CustomerData(customerId: String, accountId: String, balance: Long)
+----------+-----------+--------+
|customerId| forename| surname|
+----------+-----------+--------+
| IND0001|Christopher| Black|
| IND0002| Madeleine| Kerr|
| IND0003| Sarah| Skinner|
+----------+-----------+--------+
Как получить следующий набор данных, который добавляет столбец account , содержащий Seq [ AccountData ] каждого customerId ?
+----------+-----------+----------------------------------------------+
|customerId|forename |surname |accounts |
+----------+-----------+----------+---------------------------------- +
|IND0001 |Christopher|Black |[]
|IND0002 |Madeleine |Kerr |[[IND0002,ACC002,200],[IND0002,ACC0022,300]]
|IND0003 |Sarah |Skinner |[[IND0003,ACC003,400]
Я пробовал:
val joinCustomerAndAccount = accountDS.joinWith(customerDS, customerDS("customerId") === accountDS("customerId")).drop(col("_2"))
, который дает мне следующий фрейм данных:
+---------------------+
|_1 |
+---------------------+
|[IND0002,ACC0002,200]|
|[IND0002,ACC0022,300]|
|[IND0003,ACC0003,400]|
+---------------------+
Если я тогда сделаю:
val result = customerDS.withColumn("accounts", joinCustomerAndAccount("_1")(0))
Я получаю следующее исключение:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Field name should be String Literal, but it's 0;