Как отключить большой спарк-фрейм? - PullRequest
1 голос
/ 13 февраля 2020

Я видел несколько решений для unpivot искрового фрейма данных, когда количество столбцов достаточно мало и имена столбцов могут быть жестко закодированы. Есть ли у вас масштабируемое решение для разворота фрейма данных с многочисленными столбцами?

Ниже приведена проблема с игрушкой.

Ввод:

  val df = Seq(
    (1,1,1,0),
    (2,0,0,1)    
  ).toDF("ID","A","B","C")

+---+--------+----+
| ID|  A | B | C  |
+---+--------+-----
|  1|  1 | 1 | 0  |
|  2|  0 | 0 | 1  |
+---+----------+--+

ожидаемый результат:

+---+-----+-----+
| ID|names|count|
+---+-----------|
|  1|  A  |  1  |
|  1|  B  |  1  |
|  1|  C  |  0  |
|  2|  A  |  0  |
|  2|  B  |  0  |
|  2|  C  |  1  |
+---+-----------+

Решение должно быть применимо к наборам данных с N столбцами для разворота, где N большое (скажем, 100 столбцов).

Ответы [ 2 ]

3 голосов
/ 13 февраля 2020

Это должно работать, я предполагаю, что вы знаете список столбцов, которые вы хотите отключить на

import org.apache.spark.sql.{functions => func, _}

val df = Seq(
    (1,1,1,0),
    (2,0,0,1)    
  ).toDF("ID","A","B","C")

val cols = Seq("A", "B", "C")

df.select(
    $"ID",
    func.explode(
        func.array(
            cols.map(
                col =>
                    func.struct(    
                        func.lit(col).alias("names"),
                        func.col(col).alias("count")
                    )
            ): _*
        )
    ).alias("v")
)
.selectExpr("ID", "v.*")
0 голосов
/ 13 февраля 2020

Это можно сделать чистой искрой Sql, сложив столбцы.

Ниже приведен пример pyspark, который можно легко адаптировать к Scala. Код python важен только для динамического построения Sql на основе соответствующих полей. Я использую этот подход довольно часто.

from pyspark.sql.types import * 

df = spark.createDataFrame([
  {"id" : 1, "A" : 1, "B" : 1, "C" : 0},
  {"id" : 2, "A" : 0, "B" : 0, "C" : 1}
],
StructType([StructField("id", IntegerType()), StructField("A", IntegerType()),StructField("B", IntegerType()) , StructField("C", IntegerType())]))

def features_to_eav(df, subset=None):

  relevant_columns = subset if subset else df.columns
  n = len(relevant_columns)
  cols_to_stack = ", ".join(['\'{c}\', {c}'.format(c=c) for c in relevant_columns]) 
  stack_expression = "stack({}, {}) as (name, value)".format(n, cols_to_stack)

  fetures_to_check_df = df.select(*(["id"] + relevant_columns)).createOrReplaceTempView("features_to_check")

  sql = "select id, {} from features_to_check".format(stack_expression)
  print ("Stacking sql:", sql)
  return spark.sql(sql)

features_to_eav(df, ["A", "B", "C"]).show()

Вывод (обратите внимание на построенный sql):

Stacking sql: select id, stack(3, 'A', A, 'B', B, 'C', C) as (name, value) from features_to_check
+---+----+-----+
| id|name|value|
+---+----+-----+
|  1|   A|    1|
|  1|   B|    1|
|  1|   C|    0|
|  2|   A|    0|
|  2|   B|    0|
|  2|   C|    1|
+---+----+-----+
...