Spark DataFrame: выбор столбца по значению строки - PullRequest
0 голосов
/ 14 июля 2020

У меня DataFrame только с одним row.

df = spark.createDataFrame([(1,2,10,3,4)],['a','b','c','d','e',])

Но количество столбцов большое, около 20,000. Теперь я хочу выбрать column со значением, превышающим пороговое значение, например 5. Я пытаюсь преобразовать DataFrame в dict для подсчета, но встречаю ошибку max Heap size.

Здесь ожидаемый результат:

+---+
|  c|
+---+
| 10|
+---+

1 Ответ

1 голос
/ 14 июля 2020

Возможно, это полезно -

Transpose and filter

 val threshold = 5
    val cols = Range(1, 100).map(f => s"$f as col$f").mkString(", ")
    val df1 = spark.sql(s"select $cols")
    df1.show(false)
    df1.printSchema()
    /**
      * +----+----+----+----+----+----+----+----+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
      * |col1|col2|col3|col4|col5|col6|col7|col8|col9|col10|col11|col12|col13|col14|col15|col16|col17|col18|col19|col20|col21|col22|col23|col24|col25|col26|col27|col28|col29|col30|col31|col32|col33|col34|col35|col36|col37|col38|col39|col40|col41|col42|col43|col44|col45|col46|col47|col48|col49|col50|col51|col52|col53|col54|col55|col56|col57|col58|col59|col60|col61|col62|col63|col64|col65|col66|col67|col68|col69|col70|col71|col72|col73|col74|col75|col76|col77|col78|col79|col80|col81|col82|col83|col84|col85|col86|col87|col88|col89|col90|col91|col92|col93|col94|col95|col96|col97|col98|col99|
      * +----+----+----+----+----+----+----+----+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
      * |1   |2   |3   |4   |5   |6   |7   |8   |9   |10   |11   |12   |13   |14   |15   |16   |17   |18   |19   |20   |21   |22   |23   |24   |25   |26   |27   |28   |29   |30   |31   |32   |33   |34   |35   |36   |37   |38   |39   |40   |41   |42   |43   |44   |45   |46   |47   |48   |49   |50   |51   |52   |53   |54   |55   |56   |57   |58   |59   |60   |61   |62   |63   |64   |65   |66   |67   |68   |69   |70   |71   |72   |73   |74   |75   |76   |77   |78   |79   |80   |81   |82   |83   |84   |85   |86   |87   |88   |89   |90   |91   |92   |93   |94   |95   |96   |97   |98   |99   |
      * +----+----+----+----+----+----+----+----+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
      *
      * root
      * |-- col1: integer (nullable = false)
      * |-- col2: integer (nullable = false)
      * |-- col3: integer (nullable = false)
      * |-- col4: integer (nullable = false)
      * ...
      */

    val stringCol = df1.columns.map(c => s"'$c', cast(`$c` as int)").mkString(", ")

    val processedDF = df1.selectExpr(s"stack(${df1.columns.length}, $stringCol) as (name, value)")
      .filter(s"value > $threshold")
    processedDF.show(false)
    /**
      * -----+-----+
      * |name |value|
      * +-----+-----+
      * |col6 |6    |
      * |col7 |7    |
      * |col8 |8    |
      * |col9 |9    |
      * |col10|10   |
      * |col11|11   |
      * |col12|12   |
      * |col13|13   |
      * |col14|14   |
      * |col15|15   |
      * |col16|16   |
      * |col17|17   |
      * |col18|18   |
      * |col19|19   |
      * |col20|20   |
      * |col21|21   |
      * |col22|22   |
      * |col23|23   |
      * |col24|24   |
      * |col25|25   |
      * +-----+-----+
      * only showing top 20 rows
      */
...