Застегните и разбейте несколько столбцов в кадре данных Spark SQL - PullRequest
1 голос
/ 07 октября 2019

У меня есть фрейм данных следующей структуры:

A: Array[String]   | B: Array[String] | [ ... multiple other columns ...]
=========================================================================
[A, B, C, D]       | [1, 2, 3, 4]     | [ ... array with 4 elements ... ]
[E, F, G, H, I]    | [5, 6, 7, 8, 9]  | [ ... array with 5 elements ... ]
[J]                | [10]             | [ ... array with 1 element ...  ]

Я хочу написать UDF, который

  1. Уплотняет элементы в i-й позиции каждого столбца вDF
  2. взрывает DF на каждом из этих сжатых кортежей

Получившийся столбец должен выглядеть следующим образом:

ZippedAndExploded: Array[String]
=================================
[A, 1, ...]
[B, 2, ...]
[C, 3, ...]
[D, 4, ...]
[E, 5, ...]
[F, 6, ...]
[G, 7, ...]
[H, 8, ...]
[I, 9, ...]
[J, 10, ...]

В данный момент я используюмножественный вызов (по одному на имя столбца, список имен столбцов собирается во время выполнения) в UDF, например:

val myudf6 = udf((xa:Seq[Seq[String]],xb:Seq[String]) => {
  xa.indices.map(i => {
    xa(i) :+ xb(i) // Add one element to the zip column
  })
})

val allColumnNames = df.columns.filter(...)    

for (columnName <- allColumnNames) {
  df = df.withColumn("zipped", myudf8(df("zipped"), df(columnName))
}
df = df.explode("zipped")

Поскольку в кадре данных могут быть сотни столбцов, этот итеративный вызов withColumn кажется, занимает много времени.

Вопрос (ы): возможно ли это сделать с одним UDF и одним DF.withColumn(...) вызовом?

Важно : UDF долженzip динамическое число столбцов (читается во время выполнения).

Ответы [ 2 ]

2 голосов
/ 08 октября 2019

Используйте UDF, который принимает переменное число столбцов в качестве входных данных. Это можно сделать с помощью массива массивов (при условии, что типы одинаковы). Так как у вас есть массив массивов, можно использовать transpose, который даст те же результаты, что и сжатые списки. Полученный массив можно затем взорвать.

val array_zip_udf = udf((cols: Seq[Seq[String]]) => {
  cols.transpose
})

val allColumnNames = df.columns.filter(...).map(col)
val df2 = df.withColumn("exploded", explode(array_zip_udf(array(allColumnNames: _*))))

Обратите внимание, что в Spark 2.4 + можно будет использовать arrays_zip вместо UDF:

val df2 = df.withColumn("exploded", explode(arrays_zip(allColumnNames: _*)))
0 голосов
/ 07 октября 2019

, если вы знаете и уверены в своем количестве значений в массиве, ниже может быть одно из самых простых решений

select A[0], B[0]..... from your_table
union all
select A[1], B[1]..... from your_table
union all
select A[2], B[2]..... from your_table
union all
select A[3], B[3]..... from your_table
...