Мне нужно перебрать кадр данных в определенном порядке и применить некоторую сложную логику для вычисления нового столбца.
В приведенном ниже примере я буду использовать простое выражение, где текущее значение для s
- это умножение всех предыдущих значений, поэтому может показаться, что это можно сделать с помощью UDF или даже аналитических функций. Однако в действительности логика намного сложнее.
Ниже код делает то, что нужно
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.encoders.RowEncoder
val q = """
select 10 x, 1 y
union all select 10, 2
union all select 10, 3
union all select 20, 6
union all select 20, 4
union all select 20, 5
"""
val df = spark.sql(q)
def f_row(iter: Iterator[Row]) : Iterator[Row] = {
iter.scanLeft(Row(0,0,1)) {
case (r1, r2) => {
val (x1, y1, s1) = r1 match {case Row(x: Int, y: Int, s: Int) => (x, y, s)}
val (x2, y2) = r2 match {case Row(x: Int, y: Int) => (x, y)}
Row(x2, y2, s1 * y2)
}
}.drop(1)
}
val schema = new StructType().
add(StructField("x", IntegerType, true)).
add(StructField("y", IntegerType, true)).
add(StructField("s", IntegerType, true))
val encoder = RowEncoder(schema)
df.repartition($"x").sortWithinPartitions($"y").mapPartitions(f_row)(encoder).show
выход
scala> df.repartition($"x").sortWithinPartitions($"y").mapPartitions(f_row)(encoder).show
+---+---+---+
| x| y| s|
+---+---+---+
| 20| 4| 4|
| 20| 5| 20|
| 20| 6|120|
| 10| 1| 1|
| 10| 2| 2|
| 10| 3| 6|
+---+---+---+
Что мне не нравится в этом, это
1) Я явно определяю схему, хотя Spark может выводить имена и типы для фрейма данных
scala> df
res1: org.apache.spark.sql.DataFrame = [x: int, y: int]
2) Если я добавлю какой-либо новый столбец во фрейм данных, мне придется снова объявлять схему, а что еще более раздражает - переопределить функцию!
Предположим, что во фрейме данных есть новый столбец z
. В этом случае мне нужно изменить почти каждую строку в f_row
.
def f_row(iter: Iterator[Row]) : Iterator[Row] = {
iter.scanLeft(Row(0,0,"",1)) {
case (r1, r2) => {
val (x1, y1, z1, s1) = r1 match {case Row(x: Int, y: Int, z: String, s: Int) => (x, y, z, s)}
val (x2, y2, z2) = r2 match {case Row(x: Int, y: Int, z: String) => (x, y, z)}
Row(x2, y2, z2, s1 * y2)
}
}.drop(1)
}
val schema = new StructType().
add(StructField("x", IntegerType, true)).
add(StructField("y", IntegerType, true)).
add(StructField("z", StringType, true)).
add(StructField("s", IntegerType, true))
val encoder = RowEncoder(schema)
df.withColumn("z", lit("dummy")).repartition($"x").sortWithinPartitions($"y").mapPartitions(f_row)(encoder).show
выход
scala> df.withColumn("z", lit("dummy")).repartition($"x").sortWithinPartitions($"y").mapPartitions(f_row)(encoder).show
+---+---+-----+---+
| x| y| z| s|
+---+---+-----+---+
| 20| 4|dummy| 4|
| 20| 5|dummy| 20|
| 20| 6|dummy|120|
| 10| 1|dummy| 1|
| 10| 2|dummy| 2|
| 10| 3|dummy| 6|
+---+---+-----+---+
Есть ли способ реализовать логику в более общем виде, поэтому мне не нужно создавать функцию для итерации по каждому конкретному фрейму данных?
Или, по крайней мере, избежать изменений кода после добавления новых столбцов во фрейм данных, которые не используются в логике вычислений.
Пожалуйста, см. Обновленный вопрос ниже.
Обновление
Ниже приведены два варианта итерации в более общем виде, но все же с некоторыми недостатками.
// option 1
def f_row(iter: Iterator[Row]): Iterator[Row] = {
val r = Row.fromSeq(Row(0, 0).toSeq :+ 1)
iter.scanLeft(r)((r1, r2) =>
Row.fromSeq(r2.toSeq :+ r1.getInt(r1.size - 1) * r2.getInt(r2.fieldIndex("y")))
).drop(1)
}
df.repartition($"x").sortWithinPartitions($"y").mapPartitions(f_row)(encoder).show
// option 2
def f_row(iter: Iterator[Row]): Iterator[Row] = {
iter.map{
var s = 1
r => {
s = s * r.getInt(r.fieldIndex("y"))
Row.fromSeq(r.toSeq :+ s)
}
}
}
df.repartition($"x").sortWithinPartitions($"y").mapPartitions(f_row)(encoder).show
Если в столбец данных добавлен новый столбец, то в варианте 1 необходимо изменить начальное значение для iter.scanLeft. Также мне не очень нравится вариант 2, поскольку он использует изменяемые переменные.
Есть ли способ улучшить код, чтобы он был чисто функциональным и не требовал никаких изменений при добавлении нового столбца во фрейм данных?