Ваша строка кода hBaseRDD_iacp.map(x => systems(x._1, x._2, x._3)).toDF
должна генерировать кадр данных, эквивалентный следующему:
val df = Seq(
("ab7", Some("0.051,0.052,0.055"), Some("17.326,17.344,17.21")),
("k6c", Some("0.056,NA,0.054"), Some("17.277,17.283,17.256")),
("ad", Some("NA,23.0"), Some("24.0,23.6"))
).toDF("rowkey", "iacp", "temp")
Чтобы преобразовать набор данных в требуемый результат, вы можете применить UDF, который объединяет элементы строк CSV iacp
и temp
, чтобы создать массив (Option[Double], Option[Double])
, который затем explode
-ed, как показано ниже:
import org.apache.spark.sql.functions._
import spark.implicits._
def pairUpCSV = udf{ (s1: String, s2: String) =>
import scala.util.Try
def toNumericArr(csv: String) = csv.split(",").map{
case s if Try(s.toDouble).isSuccess => Some(s)
case _ => None
}
toNumericArr(s1).zipAll(toNumericArr(s2), None, None)
}
df.
withColumn("csv_pairs", pairUpCSV($"iacp", $"temp")).
withColumn("csv_pair", explode($"csv_pairs")).
select($"rowkey", $"csv_pair._1".as("iacp"), $"csv_pair._2".as("temp")).
show(false)
// +------+-----+------+
// |rowkey|iacp |temp |
// +------+-----+------+
// |ab7 |0.051|17.326|
// |ab7 |0.052|17.344|
// |ab7 |0.055|17.21 |
// |k6c |0.056|17.277|
// |k6c |null |17.283|
// |k6c |0.054|17.256|
// |ad |null |24.0 |
// |ad |23.0 |23.6 |
// +------+-----+------+
Обратите внимание, что значение NA
попадает в регистр по умолчанию в методе toNumericArr
, следовательно, не выделяется как отдельный случай. Кроме того, zipAll
(вместо zip
) используется в UDF для охвата случаев, когда строки CSV iacp
и temp
имеют разные размеры элементов.