Необходимо преобразовать фрейм данных, используя API фрейма данных вместо RDD - PullRequest
0 голосов
/ 15 апреля 2020

У меня есть датафрейм со следующими данными

ссуды, MTG, 111 ссуд, MTG 102 ссуды, CRDS, 103 ссуды, PCL, 104 ссуды, PCL, 105

Я хочу получить результат примерно так:

ссуды, MTG: 111: 102, PCL: 104: 105, CRDS: 103

Я могу добиться этого с помощью преобразований СДР

var data  = Seq(("loans","MTG",111),("loans","MTG" ,102),("loans","CRDS",103),("loans","PCL",104),("loans","PCL",105))


var fd1 = sc.parallelize(data)

var fd2 = fd1.map(x => ( (x(0),x(1)) , x(2) ) )

var fd3 = fd2.reduceByKey( (a,b) => a.toString + ":" + b.toString  )

var fd4 = fd3.map( x=> (x._1._1,(x._1._2 + ":"+ x._2)))

var fd5 = fd4.groupByKey()

Я хочу использовать dataframe / Dataset API или может быть spark SQL для достижения того же результата. Не могли бы вы помочь.

1 Ответ

2 голосов
/ 15 апреля 2020

Используйте .groupBy, .collect_list и concat_ws во встроенных функциях из API данных.

Example:

//sample dataframe
var data  = Seq(("loans","MTG",111),("loans","MTG" ,102),("loans","CRDS",103),("loans","PCL",104),("loans","PCL",105)).toDF("col1","col2","col3")

import org.apache.spark.sql.functions._

data.show()
//+-----+----+----+
//| col1|col2|col3|
//+-----+----+----+
//|loans| MTG| 111|
//|loans| MTG| 102|
//|loans|CRDS| 103|
//|loans| PCL| 104|
//|loans| PCL| 105|
//+-----+----+----+

data.groupBy("col1","col2").
agg(concat_ws(":",collect_set("col3")).alias("col3")).
selectExpr("col1","""concat_ws(":",col2,col3) as col2""").
groupBy("col1").
agg(concat_ws(",",collect_list("col2")).alias("col2")).
show(false)

//+-----+--------------------------------+
//|col1 |col2                            |
//+-----+--------------------------------+
//|loans|MTG:102:111,CRDS:103,PCL:104:105|
//+-----+--------------------------------+

//collect
data.groupBy("col1","col2").agg(concat_ws(":",collect_set("col3")).alias("col3")).selectExpr("col1","""concat_ws(":",col2,col3) as col2""").groupBy("col1").agg(concat_ws(",",collect_list("col2")).alias("col2")).collect()
//res22: Array[org.apache.spark.sql.Row] = Array([loans,MTG:102:111,CRDS:103,PCL:104:105])
...