Для l oop, чтобы выбрать столбец в Scala - PullRequest
0 голосов
/ 21 января 2020

Я хочу достичь ниже в scala для фрейма данных искры,

  1. Для каждого столбца выберите colname и пометить переменную (0 или 1)
  2. найти среднее значение столбец, когда флаг = 0, а затем, когда флаг = 1
  3. стандартное отклонение столбца

Я не уверен, как l oop через столбцы и выбрать каждый столбец и переменную флага каждая итерация l oop. Я попытался: -

 for (a <- colnames) {
      val dat1 = data.filter($"cust_flag".isin("1")).select(a)
      val dat0 = data.filter($"cust_flag".isin("0")).select(a)
      val m0 = dat1.select(avg(a)).asInstanceOf[Double]
      val m1 = dat0.select(avg(a)).asInstanceOf[Float]
      val stdev = data.agg(stddev(a)).asInstanceOf[Float]
      val rpb = ((m1 - m0) / stdev)*p*q
      println(rpb)

Теперь я получаю ошибку - Исключение в потоке "main" java .lang.ClassCastException: org. apache .spark. sql .Dataset не может быть брошенным на java .lang.Float

Ответы [ 4 ]

1 голос
/ 21 января 2020

У нас есть прямая функция для среднее () и stddev ()

Создание двух наборов данных фильтра

ie.

1 для flag = 0 и 2 для flag = 1 и

dfcol0= df.filter(df("colname") === "0")
dfcol1= df.filter(df("colname") === "1")

Теперь с помощью stddev () и mean () функция получает то, что требуется.

 dfcol0.select(stddev("coname")).show(false)
 dfcol0.select(mean("coname")).show(false)
1 голос
/ 21 января 2020

Чтобы создать столбец с заданным именем из строки, можно использовать простой способ:

import org.apache.spark.sql.{functions => sf}

df.select(sf.col(colName))

Вы можете комбинировать это в контрольной логике c (ваш l oop), когда вы сочтите нужным.

Если вы хотите знать, какие столбцы находятся в кадре данных, используйте df.columns.

0 голосов
/ 21 января 2020

Я бы посоветовал вам использовать df.selectExpr(), который может принимать последовательность строк:

val expressions = Seq("avg(col1) as avg_col1","std_dev(col1) as sd_col1", "...")

df.selectExpr(expressions:_*)

Вы можете сделать почти все с этой функцией, создавая массив выражений, как вы будете sh в l oop.

В любом случае я предлагаю вам показать пример ожидаемого ввода / вывода (код, который вы написали, мало что говорит).

0 голосов
/ 21 января 2020

Как я понимаю, вы пытаетесь получить среднее значение для каждого столбца в соответствии с флагом и стандартным отклонением каждого столбца независимо от флага. После этого вы применяете формулу и рассчитываете RPB.

На основе тех же логик c я взял пример данных и записал код без l oop. Это будет быстрее, чем l oop logi c, который вы используете. Spark плохо работает с l oop logi c, поэтому попробуйте собрать все необходимые данные в одну строку (например, avg0, avg1 и StdDev в примере ниже) и после этого обрабатывать горизонтально или в пакетном режиме.

Обратите внимание, как я прокомментировал выше. Я не понял значения p и q, поэтому проигнорировал его в окончательном логе выходных данных c. Вы можете добавить напрямую, если это переменные, объявленные ранее.

scala> import org.apache.spark.sql.types._
scala> val df = Seq(
     |   ("121",  "442",  "512","1"),
     |   ("134",  "434",  "752","0"),
     |   ("423",  "312",  "124","1"),
     |     ("432",  "677",  "752","0"),
     |   ("332",  "424",  "111","1")).
     |   toDF("col1","col2","col3","cust_flag").
     |   withColumn("col1", $"col1".cast(DoubleType)).
     |   withColumn("col2", $"col2".cast(DoubleType)).
     |   withColumn("col3", $"col3".cast(DoubleType))

scala> df.show
+-----+-----+-----+---------+
| col1| col2| col3|cust_flag|
+-----+-----+-----+---------+
|121.0|442.0|512.0|        1|
|134.0|434.0|752.0|        0|
|423.0|312.0|124.0|        1|
|432.0|677.0|752.0|        0|
|332.0|424.0|111.0|        1|
+-----+-----+-----+---------+

scala>val colSeq =  Seq("col1", "col2", "col3")


scala>  val aggdf  = colSeq.map(c => {
     | df.groupBy("cust_flag").agg( lit(c).alias("columnName"), avg(c).cast("Decimal(14,2)").alias("avg"))
     | })


scala> val devdf  = colSeq.map(c => {
     |   df.agg( lit(c).alias("columnName"), stddev(c).cast("Decimal(14,2)").alias("StdDev"))
     |   })

scala> val avgDF = aggdf.reduce(_ union _)

scala> val stdDevDF = devdf.reduce(_ union _)

scala> val finalAvgDF = avgDF.filter(col("cust_flag") === 1).alias("1").join(avgDF.filter(col("cust_flag") === 0).alias("0"), List("columnName")).select(col("columnName"), col("1.avg").alias("avg1"), col("0.avg").alias("avg0"))

scala> val outDF = finalAvgDF.join(stdDevDF, List("columnName"))

scala> outDF.show()
+----------+------+------+------+                                               
|columnName|  avg1|  avg0|StdDev|
+----------+------+------+------+
|      col1|292.00|283.00|152.07|
|      col2|392.67|555.50|133.48|
|      col3|249.00|752.00|319.16|
+----------+------+------+------+

//apply your final formula to ger rpb
scala> outDF.withColumn("rpb", (col("avg1") - col("avg0"))/col("StdDev")).show
+----------+------+------+------+--------------------+                          
|columnName|  avg1|  avg0|StdDev|                 rpb|
+----------+------+------+------+--------------------+
|      col1|292.00|283.00|152.07| 0.05918327086210298|
|      col2|392.67|555.50|133.48|-1.21988312855858556|
|      col3|249.00|752.00|319.16|-1.57601203158290513|
+----------+------+------+------+--------------------+ 
...