Вы можете просто использовать функцию udf
, чтобы выполнить ваше требование.
Но перед этим вам нужно будет получить полный набор из учитывает и получает транслируется для использования в функции udf
.
Возвращенный массив из функции udf
должен быть взорван и , наконец, выбрать столбцы .
import org.apache.spark.sql.functions._
val idList = df.select(collect_set("accountid")).first().getAs[Seq[String]](0)
val broadCastedIdList = sc.broadcast(idList)
def populateUdf = udf((date: String, value: Double, accountid: String)=> Array(accounts(date, value, accountid)) ++ broadCastedIdList.value.filterNot(_ == accountid).map(accounts(date, 0.0, _)))
df.select(populateUdf(col("date"), col("val"), col("accountid")).as("struct"))
.withColumn("struct", explode(col("struct")))
.select(col("struct.date"), col("struct.value").as("val"), col("struct.accountid"))
.show(false)
И, конечно, вам понадобится case class
case class accounts(date:String, value:Double, accountid:String)
, который должен дать вам
+----------+-----+---------+
|date |val |accountid|
+----------+-----+---------+
|2018-01-01|100.5|id1 |
|2018-01-01|0.0 |id2 |
|2018-01-02|120.6|id1 |
|2018-01-02|0.0 |id2 |
|2018-01-03|450.2|id2 |
|2018-01-03|0.0 |id1 |
+----------+-----+---------+
Примечание: используется ключевое слово valueв случае класса, потому что зарезервированные имена идентификаторов не могут быть использованы в качестве имен переменных