Как я понимаю, вы пытаетесь получить среднее значение для каждого столбца в соответствии с флагом и стандартным отклонением каждого столбца независимо от флага. После этого вы применяете формулу и рассчитываете RPB.
На основе тех же логик c я взял пример данных и записал код без l oop. Это будет быстрее, чем l oop logi c, который вы используете. Spark плохо работает с l oop logi c, поэтому попробуйте собрать все необходимые данные в одну строку (например, avg0, avg1 и StdDev в примере ниже) и после этого обрабатывать горизонтально или в пакетном режиме.
Обратите внимание, как я прокомментировал выше. Я не понял значения p и q, поэтому проигнорировал его в окончательном логе выходных данных c. Вы можете добавить напрямую, если это переменные, объявленные ранее.
scala> import org.apache.spark.sql.types._
scala> val df = Seq(
| ("121", "442", "512","1"),
| ("134", "434", "752","0"),
| ("423", "312", "124","1"),
| ("432", "677", "752","0"),
| ("332", "424", "111","1")).
| toDF("col1","col2","col3","cust_flag").
| withColumn("col1", $"col1".cast(DoubleType)).
| withColumn("col2", $"col2".cast(DoubleType)).
| withColumn("col3", $"col3".cast(DoubleType))
scala> df.show
+-----+-----+-----+---------+
| col1| col2| col3|cust_flag|
+-----+-----+-----+---------+
|121.0|442.0|512.0| 1|
|134.0|434.0|752.0| 0|
|423.0|312.0|124.0| 1|
|432.0|677.0|752.0| 0|
|332.0|424.0|111.0| 1|
+-----+-----+-----+---------+
scala>val colSeq = Seq("col1", "col2", "col3")
scala> val aggdf = colSeq.map(c => {
| df.groupBy("cust_flag").agg( lit(c).alias("columnName"), avg(c).cast("Decimal(14,2)").alias("avg"))
| })
scala> val devdf = colSeq.map(c => {
| df.agg( lit(c).alias("columnName"), stddev(c).cast("Decimal(14,2)").alias("StdDev"))
| })
scala> val avgDF = aggdf.reduce(_ union _)
scala> val stdDevDF = devdf.reduce(_ union _)
scala> val finalAvgDF = avgDF.filter(col("cust_flag") === 1).alias("1").join(avgDF.filter(col("cust_flag") === 0).alias("0"), List("columnName")).select(col("columnName"), col("1.avg").alias("avg1"), col("0.avg").alias("avg0"))
scala> val outDF = finalAvgDF.join(stdDevDF, List("columnName"))
scala> outDF.show()
+----------+------+------+------+
|columnName| avg1| avg0|StdDev|
+----------+------+------+------+
| col1|292.00|283.00|152.07|
| col2|392.67|555.50|133.48|
| col3|249.00|752.00|319.16|
+----------+------+------+------+
//apply your final formula to ger rpb
scala> outDF.withColumn("rpb", (col("avg1") - col("avg0"))/col("StdDev")).show
+----------+------+------+------+--------------------+
|columnName| avg1| avg0|StdDev| rpb|
+----------+------+------+------+--------------------+
| col1|292.00|283.00|152.07| 0.05918327086210298|
| col2|392.67|555.50|133.48|-1.21988312855858556|
| col3|249.00|752.00|319.16|-1.57601203158290513|
+----------+------+------+------+--------------------+