Я должен добавить или зажечь более одного столбца в другом состоянии.Когда я ставлю более одного столбца, я получаю сообщение об ошибке.
Есть ли способ заменить или каким-либо другим способом то, что я пытаюсь сделать?
Это то, что я делаю сейчас.
import org.apache.spark.sql.expressions._
val windowSpec = Window.partitionBy("FundamentalSeriesId", "FundamentalSeriesPeriodEndDate", "FundamentalSeriesPeriodType")
val windowSpec2 = Window.partitionBy("FundamentalSeriesId", "FundamentalSeriesPeriodEndDate", "FundamentalSeriesPeriodType", "group").orderBy(unix_timestamp($"TimeStamp", "yyyy-MM-dd'T'HH:mm:ss").cast("timestamp").desc)
def containsUdf = udf { (array: Seq[String]) => array.contains("null") || array.contains("NULL") || array.contains(null) }
val latestForEachKey1 = tempReorder
.withColumn("group", when(containsUdf(collect_list("FundamentalSeriesStatementTypeCode").over(windowSpec)), lit("same")).otherwise($"FundamentalSeriesStatementTypeCode"))
.withColumn("rank", row_number().over(windowSpec2))
.filter($"rank" === 1).drop("rank", "group")
Но когда я добавляю более одного столбца в другой части, я получаю ошибку.Ниже код выдает ошибку.
val latestForEachKey1 = tempReorder
.withColumn("group", when(containsUdf(collect_list("FundamentalSeriesStatementTypeCode").over(windowSpec)), lit("same")).otherwise($"FundamentalSeriesStatementTypeCode",$"FundamentalSeriesStatementPeriodId"))
.withColumn("rank", row_number().over(windowSpec2))
.filter($"rank" === 1).drop("rank", "group")