Функция условной карты Spark () на основе входных столбцов - PullRequest
1 голос
/ 13 марта 2020

Здесь я пытаюсь добиться условно сгенерированной в Spark функции SQL map в зависимости от того, имеют ли они null, 0 или любое другое значение, которое я могу захотеть.

Возьмем, к примеру, этот начальный DF.

val initialDF = Seq(
  ("a", "b", 1), 
  ("a", "b", null), 
  ("a", null, 0)
).toDF("field1", "field2", "field3")

Из этого исходного DataFrame я хочу создать еще один столбец, который будет картой, как здесь.

initialDF.withColumn("thisMap", MY_FUNCTION)

Мой текущий подход к это, в основном, Seq[String] в методе a flatMap пар ключ-значение, которые получает метод Spark SQL, например:

def toMap(columns: String*): Column = {
  map(
    columns.flatMap(column => List(lit(column), col(column))): _*
  )
}

Но тогда фильтрация становится Scala вещь и это довольно беспорядок.

То, что я хотел бы получить после обработки, было бы для каждой из этих строк следующим DataFrame.

val initialDF = Seq(
  ("a", "b", 1, Map("field1" -> "a", "field2" -> "b", "field3" -> 1)),
  ("a", "b", null, Map("field1" -> "a", "field2" -> "b")),
  ("a", null, 0, Map("field1" -> "a"))
)
  .toDF("field1", "field2", "field3", "thisMap")

Мне было интересно, если это может достигается с помощью Column API, который более интуитивно понятен с .isNull или .equalTo?

Ответы [ 2 ]

2 голосов
/ 15 марта 2020

Вот небольшое улучшение ответа Ламануса выше, которое повторяется только df.columns один раз:

import org.apache.spark.sql._
import org.apache.spark.sql.functions._

case class Record(field1: String, field2: String, field3: java.lang.Integer)

val df = Seq(
  Record("a", "b", 1),
  Record("a", "b", null),
  Record("a", null, 0)
).toDS

df.show

// +------+------+------+
// |field1|field2|field3|
// +------+------+------+
// |     a|     b|     1|
// |     a|     b|  null|
// |     a|  null|     0|
// +------+------+------+

df.withColumn("thisMap", map_concat(
    df.columns.map { colName => 
        when(col(colName).isNull or col(colName) === 0, map())
        .otherwise(map(lit(colName), col(colName)))
    }: _*
)).show(false)

// +------+------+------+---------------------------------------+
// |field1|field2|field3|thisMap                                |
// +------+------+------+---------------------------------------+
// |a     |b     |1     |[field1 -> a, field2 -> b, field3 -> 1]|
// |a     |b     |null  |[field1 -> a, field2 -> b]             |
// |a     |null  |0     |[field1 -> a]                          |
// +------+------+------+---------------------------------------+
1 голос
/ 13 марта 2020

ОБНОВЛЕНИЕ

Я нашел способ достичь ожидаемого результата, но он немного грязный.

val df2 = df.columns.foldLeft(df) { (df, n) => df.withColumn(n + "_map", map(lit(n), col(n))) }
val col_cond = df.columns.map(n => when(not(col(n + "_map").getItem(n).isNull || col(n + "_map").getItem(n) === lit("0")), col(n + "_map")).otherwise(map()))
df2.withColumn("map", map_concat(col_cond: _*))
  .show(false)

ОРИГИНАЛ

Вот моя попытка с функцией map_from_arrays, которую можно использовать в свече 2.4 +.

df.withColumn("array", array(df.columns.map(col): _*))
  .withColumn("map", map_from_arrays(lit(df.columns), $"array")).show(false)

Тогда результат будет:

+------+------+------+---------+---------------------------------------+
|field1|field2|field3|array    |map                                    |
+------+------+------+---------+---------------------------------------+
|a     |b     |1     |[a, b, 1]|[field1 -> a, field2 -> b, field3 -> 1]|
|a     |b     |null  |[a, b,]  |[field1 -> a, field2 -> b, field3 ->]  |
|a     |null  |0     |[a,, 0]  |[field1 -> a, field2 ->, field3 -> 0]  |
+------+------+------+---------+---------------------------------------+
...