Нет способа динамически ссылаться на столбец непосредственно в плане Spark.Следовательно, динамический доступ должен осуществляться либо через структуру данных, которая является частью плана, либо через несколько планов.Это приводит к трем стратегиям решения проблемы:
Использование UDF для динамического обращения к полю в Row
.Это самый общий и самый простой подход.Это работает лучше всего, когда не слишком много столбцов и / или когда данные редки.
Создайте столбец MapType
и создайте ссылку на него.В некоторых случаях это может быть более эффективным, чем (1).
Сделать несколько (легких) проходов через данные и объединить результаты.Лучше всего использовать, когда число столбцов мало, а данные в каждом столбце «тяжелые», например, глубоко структурированные и плотные.
Вот как это сделать (1):
def getColumnAs[A](colName: String, row: Row): Option[A] =
if (row == null) None
else {
val idx = row.fieldIndex(colName)
if (row.isNullAt(idx)) None else Some(row.getAs[A](idx))
}
case class Data(col_name: String, x: Option[Int], y: Option[Int])
val df = spark.createDataset(Seq(
Data("x", Some(1), None),
Data("x", Some(2), Some(20)),
Data("y", None, Some(30))
)).toDF
val colValue = udf(getColumnAs[Int] _)
df.select(
'col_name,
colValue('col_name, struct('*)).as("col_value")
)
.show
Выход
+--------+---------+
|col_name|col_value|
+--------+---------+
| x| 1|
| x| 2|
| y| 30|
+--------+---------+