Добавить производный столбец (в виде массива структуры) на основе значений и упорядочения других столбцов в кадре данных Spark Scala - PullRequest
0 голосов
/ 30 августа 2018

У меня есть фрейм данных Scala Spark с четырьмя столбцами (все строковые типы) - P, Q, R, S - и первичным ключом (называемым PK) (целочисленный тип).

Каждый из этих 4 столбцов может иметь нулевые значения. Порядок столбцов слева направо - это важность / актуальность столбца, и его необходимо сохранить. Структура базового кадра данных остается такой же, как показано.

Я хочу, чтобы конечный результат был следующим:

root
 |-- PK: integer (nullable = true)
 |-- P: string (nullable = true)
 |-- Q: string (nullable = true)
 |-- R: string (nullable = true)
 |-- S: string (nullable = true)
 |-- categoryList: array (nullable = true)
 |    |-- myStruct: struct (nullable = true)
 |    |    |-- category: boolean (nullable = true)
 |    |    |-- relevance: boolean (nullable = true)

Мне нужно создать новый столбец, полученный из 4 столбцов P, Q, R, S на основе следующего алгоритма:

  1. Для каждого элемента в каждой из четырех строк проверьте, существует ли этот элемент в Map "mapM"
  2. Если элемент существует, «категория» в структуре будет соответствующим значением из карты M. Если элемент не существует в карте M, категория должна быть нулевой.
  3. «Соответствие» в структуре должно быть порядка столбца слева направо: P -> 1, Q -> 2, R -> 3, S -> 4.
  4. Массив, образованный этими четырьмя структурами, затем добавляется в новый столбец на предоставленном кадре данных.

Я новичок в Scala, и вот что у меня есть до сих пор:

case class relevanceCaseClass(category: String, relevance: Integer)
def myUdf = udf((code: String, relevance: Integer) => relevanceCaseClass(mapM.value.getOrElse(code, null), relevance))
df.withColumn("newColumn", myUdf(col("P/Q/R/S"), 1))

Проблема в том, что я не могу передать значение порядка внутри функции withColumn. Мне нужно, чтобы функция myUdf знала значение релевантности. Я делаю что-то в корне неправильно?

Таким образом, я должен получить вывод:

PK   P    Q    R    S    newCol
1    a    b    c    null array(struct("a", 1), struct(null, 2), struct("c", 3), struct(null, 4))

Здесь значение "b" не было найдено на карте и, следовательно, значение (для категории) равно нулю. Поскольку значение для столбца S уже было нулевым, оно осталось нулевым. Актуальность в соответствии с порядком левого-правого столбца.

Ответы [ 2 ]

0 голосов
/ 30 августа 2018

Дано входной фрейм данных (тестирование, как указано в OP) как

+---+---+---+---+----+
|PK |P  |Q  |R  |S   |
+---+---+---+---+----+
|1  |a  |b  |c  |null|
+---+---+---+---+----+

root
 |-- PK: integer (nullable = false)
 |-- P: string (nullable = true)
 |-- Q: string (nullable = true)
 |-- R: string (nullable = true)
 |-- S: null (nullable = true)

и Карта вещания как

val mapM = spark.sparkContext.broadcast(Map("a" -> "a", "c" -> "c"))

Вы можете определить функцию udf и вызвать эту функцию udf, как показано ниже

def myUdf = udf((pqrs: Seq[String]) => pqrs.zipWithIndex.map(code => relevanceCaseClass(mapM.value.getOrElse(code._1, "null"), code._2+1)))
val finaldf = df.withColumn("newColumn", myUdf(array(col("P"), col("Q"), col("R"), col("S"))))

с классом корпуса как в OP

case class relevanceCaseClass(category: String, relevance: Integer)

, который должен дать вам желаемый результат, т.е. finaldf будет

+---+---+---+---+----+--------------------------------------+
|PK |P  |Q  |R  |S   |newColumn                             |
+---+---+---+---+----+--------------------------------------+
|1  |a  |b  |c  |null|[[a, 1], [null, 2], [c, 3], [null, 4]]|
+---+---+---+---+----+--------------------------------------+

root
 |-- PK: integer (nullable = false)
 |-- P: string (nullable = true)
 |-- Q: string (nullable = true)
 |-- R: string (nullable = true)
 |-- S: null (nullable = true)
 |-- newColumn: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- category: string (nullable = true)
 |    |    |-- relevance: integer (nullable = true)

Надеюсь, ответ полезен

0 голосов
/ 30 августа 2018

Вы можете передать несколько столбцов в udf, как показано в следующем примере кода

  case class Relevance(category: String, relevance: Integer)

  def myUdf = udf((p: String,q: String,s: String,r: String) => Seq(
    Relevance(mapM.value.getOrElse(p, null), 1),
    Relevance(mapM.value.getOrElse(q, null), 2),
    Relevance(mapM.value.getOrElse(s, null), 3),
    Relevance(mapM.value.getOrElse(r, null), 4)
  ))

  df.withColumn("newColumn", myUdf(df("P"),df("Q"),df("S"),df("R")))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...