Spark Scala: получить количество ненулевых столбцов в строке данных - PullRequest
0 голосов
/ 26 февраля 2019

У меня есть сценарий, в котором я получаю счетчик ненулевых значений в каждом столбце подряд.

Фрейм данных:

subaccid|srp0|srp1|srp2|srp3|srp4|srp5|srp6|srp7|srp8|srp9|srp10|srp11|srp12
+-------+----+----+----+----+----+----+------+----+----+----+-----+-----+--+    
AAA     |0.0 |12.0|12.0|0.0 |0.0 |0.0 |10.0  |0.0 |0.0 |0.0 |0.0  |0.0  |0.0
AAB     |12.0|12.0|12.0|10.0|12.0|12.0|12.0  |0.0 |0.0 |0.0 |0.0  |0.0  |0.0
AAC     |10.0|12.0|0.0 |0.0 |0.0 |10.0|10.0  |0.0 |0.0 |0.0 |0.0  |0.0  |0.0
ZZZ     |0.0 |0.0 |0.0 |0.0 |0.0 |0.0 |-110.0|0.0 |0.0 |0.0 |0.0  |0.0  |0.0 
+-------+----+----+----+----+----+----+------+----+----+----+-----+-----+--+  

Вывод:

subaccid,count of nonzeros
AAA,2
AAB,7
AAC,4
ZZZ,1

Ответы [ 2 ]

0 голосов
/ 26 февраля 2019

Это также работает, и никаких материалов RDD, мои собственные данные:

import org.apache.spark.sql.functions._
import spark.implicits._

val df = sc.parallelize(Seq(
  ("r1", 0.0, 0.0, 0.0, 0.0),
  ("r2", 6.4, 4.9, 6.3, 7.1),
  ("r3", 4.2, 0.0, 7.2, 8.4),
  ("r4", 1.0, 2.0, 0.0, 0.0)
)).toDF("ID", "a", "b", "c", "d")

val count_non_zero = df.columns.tail.map(x => when(col(x) === 0.0, 1).otherwise(0)).reduce(_ + _)

df.withColumn("non_zero_count", count_non_zero).show(false)

возвращает:

+---+---+---+---+---+--------------+
|ID |a  |b  |c  |d  |non_zero_count|
+---+---+---+---+---+--------------+
|r1 |0.0|0.0|0.0|0.0|4             |
|r2 |6.4|4.9|6.3|7.1|0             |
|r3 |4.2|0.0|7.2|8.4|1             |
|r4 |1.0|2.0|0.0|0.0|2             |
+---+---+---+---+---+--------------+

Предполагая двойной / реальный формат, в противном случае мы получим любой вопрос asInstanceOf.

Вы можете отбросить столбцы или выбрать тяжелую работу.

Надеюсь, это поможет.

0 голосов
/ 26 февраля 2019

Один из вариантов:

    //Create dataframe
     val df = sc.parallelize(
                Seq(("AAA",    0.0, 12.0,12.0,0.0, 0.0, 0.0, 10.0,  0.0, 0.0, 0.0, 0.0,  0.0,  0.0),
                    ("AAB",     12.0, 12.0, 12.0, 10.0, 12.0, 12.0, 12.0, 0.0, 0.0, 0.0, 0.0,  0.0,  0.0),
                    ("AAC",     10.0, 12.0, 0.0, 0.0, 0.0, 10.0, 10.0,  0.0, 0.0, 0.0, 0.0,  0.0, 0.0),
                    ("ZZZ",     0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 110.0,0.0, 0.0, 0.0, 0.0,  0.0,  0.0) 
    )).toDF("subaccid","srp0","srp1","srp2","srp3","srp4","srp5","srp6","srp7","srp8","srp9","srp10","srp11","srp12")

 val df2 = df.rdd.map(x => (x.getString(0),  x.toSeq.tail.filter(_ != 0).length)).toDF("subaccid", "count")

 df2.show

 //output
 +--------+-----+
|subaccid|count|
+--------+-----+
|     AAA|    3|
|     AAB|    7|
|     AAC|    4|
|     ZZZ|    1|
+--------+-----+

Конечно, это включает в себя преобразование в rdd и обратно.

...