Как создать новый столбец для набора данных, используя ".withColumn" со многими условиями в Scala Spark - PullRequest
0 голосов
/ 19 декабря 2018

У меня есть следующий входной массив

val bins = (("bin1",1.0,2.0),("bin2",3.0,4.0),("bin3",5.0,6.0))

В основном строки «bin1» относятся к значениям в ссылочном столбце, по которому фильтруется информационный кадр - новый столбец создается из другого столбца на основе условий границы воставшиеся две пары в массиве

var number_of_dataframes = bins.length
var ctempdf = spark.createDataFrame(sc.emptyRDD[Row],train_data.schema)
ctempdf = ctempdf.withColumn(colName,col(colName))
val t1 = System.nanoTime
for ( x<- 0 to binputs.length-1)

{
      var tempdf = train_data.filter(col(refCol) === bins(x)._1)
      //println(binputs(x)._1)
      tempdf = tempdf.withColumn(colName,
                                 when(col(colName) < bins(x)._2, bins(x)._2)
                                 when(col(colName) > bins(x)._3, bins(x)._3)
                                 otherwise(col(colName)))
      ctempdf = ctempdf.union(tempdf)
val duration = (System.nanoTime - t1) / 1e9d
println(duration)     
}

Приведенный выше код работает постепенно для каждого увеличивающегося значения бинов - есть ли способ, которым я могу резко ускорить это - потому что этот код снова вложен в другой цикл.

Я использовал контрольную точку / постоянный / кэш, и это не помогает

1 Ответ

0 голосов
/ 19 декабря 2018

Здесь нет необходимости в итеративном объединении.Создайте литерал map<string, struct<double, double>>, используя o.a.s.sql.functions.map (в функциональном смысле он ведет себя как задержанный string => struct<lower: dobule, upper: double>)

import org.apache.spark.sql.functions._

val bins: Seq[(String, Double Double)] = Seq(
  ("bin1",1.0,2.0),("bin2",3.0,4.0),("bin3",5.0,6.0))

val binCol = map(bins.map { 
  case (key, lower, upper) => Seq(
    lit(key), 
    struct(lit(lower) as "lower", lit(upper) as "upper")) 
}.flatten: _*)

определяют выражения, подобные этим (это простой поиск в предопределенном отображении,поэтому binCol(col(refCol)) задерживается struct<lower: dobule, upper: double>, а оставшееся apply занимает поле lower или upper):

val lower = binCol(col(refCol))("lower")
val upper =  binCol(col(refCol))("upper")
val c = col(colName)

и используется CASE ... WHEN ... ( искровой эквивалент IF тогда.ELSE )

val result = when(c.between(lower, upper), c)
  .when(c < lower, lower)
  .when(c > upper, upper)

выберите и отбросьте NULL s:

df
  .withColumn(colName, result)
  // If value is still NULL it means we didn't find refCol key in binCol keys.
  // To mimic .filter(col(refCol) === ...) we drop the rows
  .na.drop(Seq(colName))

В этом решении предполагается, что в начале colName нет значений NULL, но может быть легко настроен для обработки случаев, когда это предположение не выполняется.

Если процесс все еще неясен, я бы рекомендовал отслеживать его поэтапно с литералами:

spark.range(1).select(binCol as "map").show(false)
+------------------------------------------------------------+
|map                                                         |
+------------------------------------------------------------+
|[bin1 -> [1.0, 2.0], bin2 -> [3.0, 4.0], bin3 -> [5.0, 6.0]]|
+------------------------------------------------------------+
spark.range(1).select(binCol(lit("bin1")) as "value").show(false)
+----------+
|value     |
+----------+
|[1.0, 2.0]|
+----------+
spark.range(1).select(binCol(lit("bin1"))("lower") as "value").show
+-----+
|value|
+-----+
|  1.0|
+-----+

и далее со ссылкой на Запрос Spark SQL DataFrame со сложными типами .

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...