Spark генерирует список имен столбцов, который содержит (SQL LIKE) строку - PullRequest
0 голосов
/ 11 января 2019

Ниже приведен простой синтаксис для поиска строки в определенном столбце с использованием функциональности SQL Like.

val dfx = df.filter($"name".like(s"%${productName}%"))

Вопрос в том, как мне взять каждый столбец ИМЯ , который содержал конкретную строку в его ЗНАЧЕНИЯХ, и создать новый столбец со списком этих "имен столбцов" для каждой строки.

До сих пор я использовал этот подход, но застрял, поскольку не могу использовать spark-sql функцию "Like" внутри UDF.

import org.apache.spark.sql.functions._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types._

import spark.implicits._
val df1 = Seq(
  (0, "mango", "man", "dit"), 
  (1, "i-man", "man2", "mane"),
  (2, "iman", "mango", "ho"),
  (3, "dim",  "kim", "sim")
).toDF("id", "col1", "col2", "col3")

val df2 = df1.columns.foldLeft(df1) {
  (acc: DataFrame, colName: String) =>
    acc.withColumn(colName, concat(lit(colName + "="), col(colName)))
}

val df3 = df2.withColumn("merged_cols", split(concat_ws("X",  df2.columns.map(c=> col(c)):_*), "X"))

Вот пример вывода. Обратите внимание , что здесь есть только 3 столбца, но в реальной работе я буду читать несколько таблиц, которые могут содержать динамическое количество столбцов.

+--------------------------------------------+
|id  |   col1|  col2|  col3|      merged_cols
+--------------------------------------------+
  0  |  mango| man  |  dit | col1, col2
  1  |  i-man| man2 | mane | col1, col2, col3
  2  |  iman | mango| ho   | col1, col2
  3  |   dim |  kim |   sim| 
+--------------------------------------------+

1 Ответ

0 голосов
/ 11 января 2019

Это можно сделать, используя foldLeft над столбцами вместе с when и otherwise:

val e = "%man%"

val df2 = df1.columns.foldLeft(df.withColumn("merged_cols", lit(""))){(df, c) => 
    df.withColumn("merged_cols", when(col(c).like(e), concat($"merged_cols", lit(s"$c,"))).otherwise($"merged_cols"))}
  .withColumn("merged_cols", expr("substring(merged_cols, 1, length(merged_cols)-1)"))

Все столбцы, удовлетворяющие условию e, будут добавлены к строке в столбце merged_cols. Обратите внимание, что для работы первого добавления столбец должен существовать, поэтому он добавляется (содержит пустую строку) в кадр данных при отправке в foldLeft.

Последняя строка в коде просто удаляет лишние ,, которые добавляются в конце. Если вы хотите получить результат в виде массива, просто добавьте .withColumn("merged_cols", split($"merged_cols", ",")).


Альтернативный способ оценки - использовать UDF. Это может быть предпочтительным при работе со многими столбцами, поскольку foldLeft создаст несколько копий данных. Здесь используется регулярное выражение (не как в SQL, так как оно работает с целыми столбцами).

val e = ".*man.*"

val concat_cols = udf((vals: Seq[String], names: Seq[String]) => {
  vals.zip(names).filter{case (v, n) => v.matches(e)}.map(_._2)
})

val df2 = df.withColumn("merged_cols", concat_cols(array(df.columns.map(col(_)): _*), typedLit(df.columns.toSeq)))

Примечание : typedLit можно использовать в версиях Spark 2.2+, при использовании более старых версий используйте array(df.columns.map(lit(_)): _*).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...