Вы также можете решить эту проблему с помощью более классического подхода, используя встроенные в SQL функции when-case:
import sparkSession.implicits._
val defectiveItems = Seq(
(1, "gem1", Map("gem1" -> 10, "gem2" -> 0, "gem3" -> 0)),
(2, "gem1", Map("gem1" -> 15, "gem2" -> 0, "gem3" -> 0)),
(3, "gem1", Map("gem1" -> 33, "gem2" -> 0, "gem3" -> 0)),
(4, "gem3", Map("gem1" -> 0, "gem2" -> 0, "gem3" -> 2))
).toDF("Id", "defectiveItem", "item")
import org.apache.spark.sql.functions._
val datasetWithCount = defectiveItems.withColumn("count", when($"defectiveItem" === "gem1", $"item.gem1").otherwise(when($"defectiveItem" === "gem2", $"item.gem2").otherwise($"item.gem3")))
println("All items="+datasetWithCount.collectAsList())
Будет напечатано:
All items=[[1,gem1,Map(gem1 -> 10, gem2 -> 0, gem3 -> 0),10], [2,gem1,Map(gem1 -> 15, gem2 -> 0, gem3 -> 0),15], [3,gem1,Map(gem1 -> 33, gem2 -> 0, gem3 -> 0),33], [4,gem3,Map(gem1 -> 0, gem2 -> 0, gem3 -> 2),2]]
Используя собственные решения, вы можете воспользоваться преимуществами внутренней оптимизации Spark для планов выполнения.