Эквивалент SQL вашего примера с данными будет:
scala> val df = Seq((10003014,"MH43AJ411",20000000),
| (10003014,"MH43AJ411",20000001),
| (10003015,"MH12GZ3392",20000002)
| ).toDF("ACCOUNTNO","VEHICLENUMBER","CUSTOMERID").withColumn("VEHICLE",struct("VEHICLENUMBER","CUSTOMERID"))
df: org.apache.spark.sql.DataFrame = [ACCOUNTNO: int, VEHICLENUMBER: string ... 2 more fields]
scala> df.registerTempTable("vehicles")
scala> val sqlDF = spark.sql("SELECT ACCOUNTNO, collect_list(VEHICLE) as ACCOUNT_LIST FROM VEHICLES group by ACCOUNTNO").toJSON
sqlDF: org.apache.spark.sql.Dataset[String] = [value: string]
scala> sqlDF.show(false)
+-----------------------------------------------------------------------------------------------------------------------------------------------+
|value |
+-----------------------------------------------------------------------------------------------------------------------------------------------+
|{"ACCOUNTNO":10003014,"ACCOUNT_LIST":[{"VEHICLENUMBER":"MH43AJ411","CUSTOMERID":20000000},{"VEHICLENUMBER":"MH43AJ411","CUSTOMERID":20000001}]}|
|{"ACCOUNTNO":10003015,"ACCOUNT_LIST":[{"VEHICLENUMBER":"MH12GZ3392","CUSTOMERID":20000002}]} |
+-----------------------------------------------------------------------------------------------------------------------------------------------+