Scala Spark: сглаживание массива структур ключей / значений - PullRequest
0 голосов
/ 08 мая 2020

У меня есть входной фрейм данных, который содержит столбец типа массива. Каждая запись в массиве представляет собой структуру, состоящую из ключа (примерно из четырех значений) и значения. Я хочу превратить это в фрейм данных с одним столбцом для каждого возможного ключа и нулевыми значениями, если это значение отсутствует в массиве для этой строки. Ключи никогда не дублируются ни в одном из массивов, но они могут быть неисправны или отсутствовать.

Пока что лучшее, что у меня есть, это

val wantedCols =df.columns
  .filter(_ != arrayCol)
  .filter(_ != "col")
val flattened = df
        .select((wantedCols.map(col(_)) ++ Seq(explode(col(arrayCol)))):_*)
        .groupBy(wantedCols.map(col(_)):_*)
        .pivot("col.key")
        .agg(first("col.value"))

Это именно то, что я хочу , но это ужасно, и я понятия не имею, каковы будут последствия группировки по каждому столбцу, кроме одного. Какой ПРАВИЛЬНЫЙ способ сделать это?

РЕДАКТИРОВАТЬ: Пример ввода / вывода:

case class testStruct(name : String, number : String)
val dfExampleInput = Seq(
(0, "KY", Seq(testStruct("A", "45"))),
(1, "OR", Seq(testStruct("A", "30"), testStruct("B", "10"))))
.toDF("index", "state", "entries")
.show

+-----+-----+------------------+
|index|state|           entries|
+-----+-----+------------------+
|    0|   KY|         [[A, 45]]|
|    1|   OR|[[A, 30], [B, 10]]|
+-----+-----+------------------+

val dfExampleOutput = Seq(
  (0, "KY", "45", null),
  (1, "OR", "30", "10"))
  .toDF("index", "state", "A", "B")
  .show

+-----+-----+---+----+
|index|state|  A|   B|
+-----+-----+---+----+
|    0|   KY| 45|null|
|    1|   OR| 30|  10|
+-----+-----+---+----+

ДАЛЬНЕЙШИЕ РЕДАКТИРОВАНИЯ:

Я сам отправил решение (см. Ниже), которое обрабатывает это хорошо, если вы знаете ключи заранее (в моем случае я знаю). Если поиск ключей является проблемой, другой ответ содержит код для ее решения.

Ответы [ 4 ]

0 голосов
/ 04 июня 2020

Я сам разработал решение:

def extractFromArray(colName : String, key : String, numKeys : Int, keyName : String) = {
  val indexCols = (0 to numKeys-1).map(col(colName).getItem(_))
  indexCols.foldLeft(lit(null))((innerCol : Column, indexCol : Column) =>
      when(indexCol.isNotNull && (indexCol.getItem(keyName) === key), indexCol)
      .otherwise(innerCol))
}

Пример:

case class testStruct(name : String, number : String)
val df = Seq(
  (0, "KY", Seq(testStruct("A", "45"))),
  (1, "OR", Seq(testStruct("A", "30"), testStruct("B", "10"))),
  (2, "FL", Seq(testStruct("A", "30"), testStruct("B", "10"), testStruct("C", "20"))),
  (3, "TX", Seq(testStruct("B", "60"), testStruct("A", "19"), testStruct("C", "40")))
)
.toDF("index", "state", "entries")
.withColumn("A", extractFromArray("entries", "B", 3, "name"))
.show

, что дает:

+-----+-----+--------------------+-------+
|index|state|             entries|      A|
+-----+-----+--------------------+-------+
|    0|   KY|           [[A, 45]]|   null|
|    1|   OR|  [[A, 30], [B, 10]]|[B, 10]|
|    2|   FL|[[A, 30], [B, 10]...|[B, 10]|
|    3|   TX|[[B, 60], [A, 19]...|[B, 60]|
+-----+-----+--------------------+-------+

Это решение немного отличается из других ответов:

  • Он работает только с одним ключом за раз
  • Он требует, чтобы имя ключа и количество ключей были известны заранее
  • Он производит столбец структур, вместо того, чтобы выполнять дополнительный шаг по извлечению спецификаций c значений
  • Он работает как простая операция от столбца к столбцу, а не требует преобразований всего DF
  • Его можно вычислить лениво.

Первые три проблемы можно решить с помощью вызова кода и оставить его несколько более гибким для случаев, когда вы уже знаете ключи или где структуры содержат дополнительные значения для извлечения.

0 голосов
/ 11 мая 2020

Без groupBy pivot agg first

Пожалуйста, проверьте код ниже.

scala> val df = Seq((0, "KY", Seq(("A", "45"))),(1, "OR", Seq(("A", "30"),("B", "10")))).toDF("index", "state", "entries").withColumn("entries",$"entries".cast("array<struct<name:string,number:string>>"))
df: org.apache.spark.sql.DataFrame = [index: int, state: string ... 1 more field]

scala> df.printSchema
root
 |-- index: integer (nullable = false)
 |-- state: string (nullable = true)
 |-- entries: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- number: string (nullable = true)


scala> df.show(false)
+-----+-----+------------------+
|index|state|entries           |
+-----+-----+------------------+
|0    |KY   |[[A, 45]]         |
|1    |OR   |[[A, 30], [B, 10]]|
+-----+-----+------------------+


scala> val finalDFColumns = df.select(explode($"entries").as("entries")).select("entries.*").select("name").distinct.map(_.getAs[String](0)).orderBy($"value".asc).collect.foldLeft(df.limit(0))((cdf,c) => cdf.withColumn(c,lit(null))).columns
finalDFColumns: Array[String] = Array(index, state, entries, A, B)

scala> val finalDF = df.select($"*" +: (0 until max).map(i => $"entries".getItem(i)("number").as(i.toString)): _*)
finalDF: org.apache.spark.sql.DataFrame = [index: int, state: string ... 3 more fields]

scala> finalDF.show(false)
+-----+-----+------------------+---+----+
|index|state|entries           |0  |1   |
+-----+-----+------------------+---+----+
|0    |KY   |[[A, 45]]         |45 |null|
|1    |OR   |[[A, 30], [B, 10]]|30 |10  |
+-----+-----+------------------+---+----+


scala> finalDF.printSchema
root
 |-- index: integer (nullable = false)
 |-- state: string (nullable = true)
 |-- entries: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- number: string (nullable = true)
 |-- 0: string (nullable = true)
 |-- 1: string (nullable = true)

scala> finalDF.columns.zip(finalDFColumns).foldLeft(finalDF)((fdf,column) => fdf.withColumnRenamed(column._1,column._2)).show(false)
+-----+-----+------------------+---+----+
|index|state|entries           |A  |B   |
+-----+-----+------------------+---+----+
|0    |KY   |[[A, 45]]         |45 |null|
|1    |OR   |[[A, 30], [B, 10]]|30 |10  |
+-----+-----+------------------+---+----+



scala>

Конечный результат


scala> finalDF.columns.zip(finalDFColumns).foldLeft(finalDF)((fdf,column) => fdf.withColumnRenamed(column._1,column._2)).drop($"entries").show(false)
+-----+-----+---+----+
|index|state|A  |B   |
+-----+-----+---+----+
|0    |KY   |45 |null|
|1    |OR   |30 |10  |
+-----+-----+---+----+

0 голосов
/ 12 мая 2020

Вот еще один способ, основанный на предположении, что в столбце entries нет дубликатов, т.е. Seq(testStruct("A", "30"), testStruct("A", "70"), testStruct("B", "10")) вызовет ошибку. Следующее решение сочетает в себе API RDD и Dataframe для реализации:

import org.apache.spark.sql.functions.explode
import org.apache.spark.sql.types.StructType

case class testStruct(name : String, number : String)
val df = Seq(
  (0, "KY", Seq(testStruct("A", "45"))),
  (1, "OR", Seq(testStruct("A", "30"), testStruct("B", "10"))),
  (2, "FL", Seq(testStruct("A", "30"), testStruct("B", "10"), testStruct("C", "20"))),
  (3, "TX", Seq(testStruct("B", "60"), testStruct("A", "19"), testStruct("C", "40")))
)
.toDF("index", "state", "entries")
.cache

// get all possible keys from entries i.e Seq[A, B, C]
val finalCols = df.select(explode($"entries").as("entry"))
                  .select($"entry".getField("name").as("entry_name"))
                  .distinct
                  .collect
                  .map{_.getAs[String]("entry_name")}
                  .sorted // Attention: we need to retain the order of the columns 
                          // 1. when generating row values and
                          // 2. when creating the schema

val rdd = df.rdd.map{ r =>
  // transform the entries array into a map i.e Map(A -> 30, B -> 10)
  val entriesMap = r.getSeq[Row](2).map{r => (r.getString(0), r.getString(1))}.toMap

  // transform finalCols into a map with null value i.e Map(A -> null, B -> null, C -> null)
  val finalColsMap = finalCols.map{c => (c, null)}.toMap

  // replace null values with those that are present from the current row by merging the two previous maps
  // Attention: this should retain the order of finalColsMap
  val merged = finalColsMap ++ entriesMap

  // concatenate the two first row values ["index", "state"] with the values from merged
  val finalValues = Seq(r(0), r(1)) ++ merged.values

  Row.fromSeq(finalValues)
}

val extraCols = finalCols.map{c => s"`${c}` STRING"}
val schema = StructType.fromDDL("`index` INT, `state` STRING," + extraCols.mkString(","))

val finalDf = spark.createDataFrame(rdd, schema)

finalDf.show
// +-----+-----+---+----+----+
// |index|state|  A|   B|   C|
// +-----+-----+---+----+----+
// |    0|   KY| 45|null|null|
// |    1|   OR| 30|  10|null|
// |    2|   FL| 30|  10|  20|
// |    3|   TX| 19|  60|  40|
// +-----+-----+---+----+----+

Примечание: решение требует одного дополнительного действия для получения уникальных ключей, хотя это не так. не вызывает перетасовки, так как она основана только на узких преобразованиях.

0 голосов
/ 09 мая 2020

Я бы не стал беспокоиться слишком насчет группировки по нескольким столбцам, кроме того, что это может сбить с толку. В этом ключе, если есть более простой и удобный способ, go для него. Без примера ввода / вывода я не уверен, что это приведет вас к тому месту, где вы пытаетесь go, но, возможно, это будет полезно:

Seq(Seq("k1" -> "v1", "k2" -> "v2")).toDS() // some basic input based on my understanding of your description
  .select(explode($"value")) // flatten the array
  .select("col.*") // de-nest the struct
  .groupBy("_2") // one row per distinct value
  .pivot("_1") // one column per distinct key
  .count // or agg(first) if you want the value in each column
  .show
+---+----+----+
| _2|  k1|  k2|
+---+----+----+
| v2|null|   1|
| v1|   1|null|
+---+----+----+

На основе того, что вы сейчас сказал, у меня сложилось впечатление, что есть много столбцов, таких как "состояние", которые не требуются для агрегирования, но должны быть в конечном результате.

Для справки, если вам не нужно было сводить , вы можете добавить столбец структуры со всеми такими вложенными полями, а затем добавить его в свою агрегацию, например: .agg(first($"myStruct"), first($"number")). Основное преимущество состоит в том, что в groubBy указаны только фактические ключевые столбцы. Но при использовании pivot все становится немного странно, поэтому мы отложим эту опцию.

В этом варианте использования самый простой способ, который я мог придумать, - это разделить ваш фрейм данных и снова объединить его после агрегации используя некоторый rowkey. В этом примере я предполагаю, что "index" подходит для этой цели:

 val mehCols = dfExampleInput.columns.filter(_ != "entries").map(col)
 val mehDF = dfExampleInput.select(mehCols:_*)
 val aggDF = dfExampleInput
   .select($"index", explode($"entries").as("entry"))
   .select($"index", $"entry.*")
   .groupBy("index")
   .pivot("name")
   .agg(first($"number"))

 scala> mehDF.join(aggDF, Seq("index")).show
 +-----+-----+---+----+
 |index|state|  A|   B|
 +-----+-----+---+----+
 |    0|   KY| 45|null|
 |    1|   OR| 30|  10|
 +-----+-----+---+----+

Я сомневаюсь, что вы заметите большую разницу в производительности, если таковая имеется. Может быть, в крайних случаях, например: очень много meh столбцов, или очень много сводных столбцов, или что-то в этом роде, или, может быть, вообще ничего. Лично я бы протестировал и то, и другое с вводом приличного размера, и, если не было значительной разницы, использовал бы тот, который кажется более простым в обслуживании.

...