Вывести фрейм данных с использованием foldLeft на существующий фрейм данных - PullRequest
3 голосов
/ 22 апреля 2019

У меня есть искровой фрейм данных, к которому я хочу применить агрегатные функции, используя foldLeft (или любой другой метод) для каждого столбца.Агрегатные функции, применяемые к столбцу, будут зависеть от типа данных столбца.

Обратите внимание, что, поскольку я буду работать с большим фреймом данных, я не хочу использовать .collect() или что-либо, что записывает много вещей в драйвер.

Исходный фрейм данных выглядит такследующее:

+----------------+-----------------+------------------+
| id(StringType) | lat(DoubleType) | long(DoubleType) |
+----------------+-----------------+------------------+
| ID1            | 10.2            | 20.1             |
| ID2            | 11.1            | 50.1             |
| ID3            | null            | null             |
+----------------+-----------------+------------------+

В этом примере я хочу вычислить количество нулей для всех типов данных, вычислять только среднее для DoubleType и вычислять только количество элементов для столбцов StringType.

Вот мой скелетный код, который реализует foldLeft, но, возможно, это не правильный путь.

def ourMethod(df: DataFrame): DataFrame = {
  val columns = df.schema.fields
  val initDf = spark.emptyDataFrame
  columns.foldLeft(...)((tempDf, column) => {
    column match {
      case StructField(name, dataType, _, _) => {
        dataType match {
          case StringType => ... //something like df.select("column").approx_count_distinct(), though writes in driver.
          case DoubleType => ... //something like df.agg(avg(column))
        }
      }
    }
  })
}

Ожидаемый результат выглядит следующим образом:

+----------+---------+-------+-------------+
| col_name | is_null | mean  | cardinality |
+----------+---------+-------+-------------+
| id       |       0 | null  | 3           |
| lat      |       1 | 10.65 | null        |
| long     |       1 | 35.1  | null        |
+----------+---------+-------+-------------+

1 Ответ

1 голос
/ 22 апреля 2019

Не уверен, поможет ли foldLeft здесь, но это определенно выполнимо. Заданный фрейм данных

val df =
  Seq(("ID1", Some(10.2), Some(20.1)),
      ("ID2", Some(11.1), Some(50.1)),
      ("ID3", None, None))
    .toDF("id", "lat", "lon")

мы можем сделать пару подходов.

  1. Программно создавать агрегатные функции. Довольно просто
val aggs = df.schema.fields.flatMap {
  case StructField(name, DoubleType, _, _) =>
    Seq(max(col(name).isNull) as s"${name}_is_null",
        mean(col(name)) as s"${name}_mean")
  case StructField(name, StringType, _, _) =>
    Seq(max(col(name).isNull) as s"${name}_is_null",
        max(length(col(name))) as s"${name}_cardinality")
}

df.agg(aggs.head, aggs.tail: _*).show()

Однако вывод будет в одну строку, а не точно, что было задано. Конечно, этот единственный ряд может быть, например, собирается в драйвер и модифицируется или отображается в нужном формате. Это необработанный вывод:

+----------+--------------+-----------+------------------+-----------+--------+
|id_is_null|id_cardinality|lat_is_null|          lat_mean|lon_is_null|lon_mean|
+----------+--------------+-----------+------------------+-----------+--------+
|     false|             3|       true|10.649999999999999|       true|    35.1|
+----------+--------------+-----------+------------------+-----------+--------+
  1. Расширение строк до формата, в котором имя строки является столбцом, который можно использовать для группировки, а возможные значения оборачиваются в пустые поля. Это работает, так как значения null опущены в агрегатах
case class FlatRow(name: String, d: Option[Double], s: Option[String])

df.flatMap { row: Row =>
    row.schema.fields.zipWithIndex.map {
      case (StructField(name, DoubleType, _, _), index) =>
        FlatRow(name,
                if (row.isNullAt(index)) None
                else Some(row.getDouble(index)),
                None)
      case (StructField(name, StringType, _, _), index) =>
        FlatRow(name,
                None,
                if (row.isNullAt(index)) None
                else Some(row.getString(index)))
    }
  }
  .groupBy($"name")
  .agg(max($"d".isNull && $"s".isNull) as "is_null",
       mean($"d") as "mean",
       max(length($"s")) as "cardinality")
  .show()

Немного больше кода, но он выводит формат, который был задан:

+----+-------+------------------+-----------+
|name|is_null|              mean|cardinality|
+----+-------+------------------+-----------+
| lat|   true|10.649999999999999|       null|
| lon|   true|              35.1|       null|
|  id|  false|              null|          3|
+----+-------+------------------+-----------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...