использование sparksql и spark dataframe Как найти имя столбца на основе минимального значения в строке - PullRequest
0 голосов
/ 15 ноября 2018

у меня есть датафрейм df.он имеет 4 столбца

+-------+-------+-------+-------+  
| dist1 | dist2 | dist3 | dist4 |
+-------+-------+-------+-------+  
|  42   |  53   |  24   |  17   |
+-------+-------+-------+-------+  

вывод, который я хочу получить

dist4

, кажется простым, но я не нашел подходящего решения, используя запрос к фрейму данных или sparksql

Ответы [ 5 ]

0 голосов
/ 15 ноября 2018

способ RDD и без udf () s.

scala> val df = Seq((1,2,3,4),(5,4,3,1)).toDF("A","B","C","D")
df: org.apache.spark.sql.DataFrame = [A: int, B: int ... 2 more fields]

scala> val df2 = df.withColumn("arr", array(df.columns.map(col(_)):_*))
df2: org.apache.spark.sql.DataFrame = [A: int, B: int ... 3 more fields]

scala>  val rowarr = df.columns
rowarr: Array[String] = Array(A, B, C, D)

scala> val rdd1 = df2.rdd.map( x=> {val p = x.getAs[WrappedArray[Int]]("arr").toArray; val q=rowarr(p.indexWhere(_==p.min));Row.merge(x,Row(q)) })
rdd1: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[83] at map at <console>:47

scala> spark.createDataFrame(rdd1,df2.schema.add(StructField("mincol",StringType))).show
+---+---+---+---+------------+------+
|  A|  B|  C|  D|         arr|mincol|
+---+---+---+---+------------+------+
|  1|  2|  3|  4|[1, 2, 3, 4]|     A|
|  5|  4|  3|  1|[5, 4, 3, 1]|     D|
+---+---+---+---+------------+------+


scala>
0 голосов
/ 15 ноября 2018
Try this,

df.show
+---+---+---+---+
|  A|  B|  C|  D|
+---+---+---+---+
|  1|  2|  3|  4|
|  5|  4|  3|  1|
+---+---+---+---+

val temp_df = df.columns.foldLeft(df) { (acc: DataFrame, colName: String) => acc.withColumn(colName, concat(col(colName), lit(","+colName)))}

val minval = udf((ar: Seq[String]) => ar.min.split(",")(1))

val result = temp_df.withColumn("least", split(concat_ws(":",x.columns.map(col(_)):_*),":")).withColumn("least_col", minval(col("least")))

result.show
+---+---+---+---+--------------------+---------+
|  A|  B|  C|  D|               least|least_col|
+---+---+---+---+--------------------+---------+
|1,A|2,B|3,C|4,D|[1,A, 2,B, 3,C, 4,D]|        A|
|5,A|4,B|3,C|1,D|[5,A, 4,B, 3,C, 1,D]|        D|
+---+---+---+---+--------------------+---------+
0 голосов
/ 15 ноября 2018

Вы можете получить доступ к схеме строк, получить список имен оттуда и получить доступ к значению строк по имени, а затем выяснить это таким образом.

см .: https://spark.apache.org/docs/2.3.2/api/scala/index.html#org.apache.spark.sql.Row

это будет выглядеть примерно так

dataframe.map(
    row => {
        val schema = row.schema
        val fieldNames:List[String] =  ??? //extract names from schema
        fieldNames.foldLeft(("", 0))(???) // retireve field value using it's name and retain maxiumum
    }
)

Это даст Dataset[String]

0 голосов
/ 15 ноября 2018

вы можете сделать что-то вроде,

import org.apache.spark.sql.functions._

val cols = df.columns
val u1 = udf((s: Seq[Int]) => cols(s.zipWithIndex.min._2))

df.withColumn("res", u1(array("*")))
0 голосов
/ 15 ноября 2018

Вы можете использовать least функцию как

select least(dist1,dist2,dist3,dist4) as min_dist
  from yourTable;

Для противоположных случаев можно использовать greatest.

РЕДАКТИРОВАТЬ: Для определения имен столбцов для получения строк может использоваться следующее

select inline(array(struct(42, 'dist1'), struct(53, 'dist2'), 
                    struct(24, 'dist3'), struct(17, 'dist4') ))

42  dist1
53  dist2
24  dist3
17  dist4 

и затем min функция может быть применена для получения dist4

...