Общий итератор над фреймом данных (Spark / scala) - PullRequest
0 голосов
/ 05 ноября 2018

Мне нужно перебрать кадр данных в определенном порядке и применить некоторую сложную логику для вычисления нового столбца.

В приведенном ниже примере я буду использовать простое выражение, где текущее значение для s - это умножение всех предыдущих значений, поэтому может показаться, что это можно сделать с помощью UDF или даже аналитических функций. Однако в действительности логика намного сложнее.

Ниже код делает то, что нужно

import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.encoders.RowEncoder

val q = """
select 10 x, 1 y
union all select 10, 2
union all select 10, 3
union all select 20, 6
union all select 20, 4
union all select 20, 5
"""
val df = spark.sql(q)
def f_row(iter: Iterator[Row]) : Iterator[Row] = {
  iter.scanLeft(Row(0,0,1)) {
    case (r1, r2) => {
      val (x1, y1, s1) = r1 match {case Row(x: Int, y: Int, s: Int) => (x, y, s)}
      val (x2, y2)     = r2 match {case Row(x: Int, y: Int) => (x, y)}
      Row(x2, y2, s1 * y2)
    }
  }.drop(1)
}
val schema = new StructType().
             add(StructField("x", IntegerType, true)).
             add(StructField("y", IntegerType, true)).
             add(StructField("s", IntegerType, true))
val encoder = RowEncoder(schema)
df.repartition($"x").sortWithinPartitions($"y").mapPartitions(f_row)(encoder).show

выход

scala> df.repartition($"x").sortWithinPartitions($"y").mapPartitions(f_row)(encoder).show
+---+---+---+
|  x|  y|  s|
+---+---+---+
| 20|  4|  4|
| 20|  5| 20|
| 20|  6|120|
| 10|  1|  1|
| 10|  2|  2|
| 10|  3|  6|
+---+---+---+

Что мне не нравится в этом, это

1) Я явно определяю схему, хотя Spark может выводить имена и типы для фрейма данных

scala> df
res1: org.apache.spark.sql.DataFrame = [x: int, y: int]

2) Если я добавлю какой-либо новый столбец во фрейм данных, мне придется снова объявлять схему, а что еще более раздражает - переопределить функцию!

Предположим, что во фрейме данных есть новый столбец z. В этом случае мне нужно изменить почти каждую строку в f_row.

def f_row(iter: Iterator[Row]) : Iterator[Row] = {
  iter.scanLeft(Row(0,0,"",1)) {
    case (r1, r2) => {
      val (x1, y1, z1, s1) = r1 match {case Row(x: Int, y: Int, z: String, s: Int) => (x, y, z, s)}
      val (x2, y2, z2)     = r2 match {case Row(x: Int, y: Int, z: String) => (x, y, z)}
      Row(x2, y2, z2, s1 * y2)
    }
  }.drop(1)
}
val schema = new StructType().
             add(StructField("x", IntegerType, true)).
             add(StructField("y", IntegerType, true)).
             add(StructField("z", StringType, true)).
             add(StructField("s", IntegerType, true))
val encoder = RowEncoder(schema)
df.withColumn("z", lit("dummy")).repartition($"x").sortWithinPartitions($"y").mapPartitions(f_row)(encoder).show

выход

scala> df.withColumn("z", lit("dummy")).repartition($"x").sortWithinPartitions($"y").mapPartitions(f_row)(encoder).show
+---+---+-----+---+
|  x|  y|    z|  s|
+---+---+-----+---+
| 20|  4|dummy|  4|
| 20|  5|dummy| 20|
| 20|  6|dummy|120|
| 10|  1|dummy|  1|
| 10|  2|dummy|  2|
| 10|  3|dummy|  6|
+---+---+-----+---+

Есть ли способ реализовать логику в более общем виде, поэтому мне не нужно создавать функцию для итерации по каждому конкретному фрейму данных? Или, по крайней мере, избежать изменений кода после добавления новых столбцов во фрейм данных, которые не используются в логике вычислений.

Пожалуйста, см. Обновленный вопрос ниже.

Обновление

Ниже приведены два варианта итерации в более общем виде, но все же с некоторыми недостатками.

// option 1
def f_row(iter: Iterator[Row]): Iterator[Row] = {
  val r = Row.fromSeq(Row(0, 0).toSeq :+ 1)
  iter.scanLeft(r)((r1, r2) => 
    Row.fromSeq(r2.toSeq :+ r1.getInt(r1.size - 1) * r2.getInt(r2.fieldIndex("y")))
  ).drop(1)
}
df.repartition($"x").sortWithinPartitions($"y").mapPartitions(f_row)(encoder).show

// option 2
def f_row(iter: Iterator[Row]): Iterator[Row] = {
  iter.map{
    var s = 1
    r => {
      s = s * r.getInt(r.fieldIndex("y"))
      Row.fromSeq(r.toSeq :+ s)
    }
  }
}
df.repartition($"x").sortWithinPartitions($"y").mapPartitions(f_row)(encoder).show

Если в столбец данных добавлен новый столбец, то в варианте 1 необходимо изменить начальное значение для iter.scanLeft. Также мне не очень нравится вариант 2, поскольку он использует изменяемые переменные.

Есть ли способ улучшить код, чтобы он был чисто функциональным и не требовал никаких изменений при добавлении нового столбца во фрейм данных?

1 Ответ

0 голосов
/ 06 ноября 2018

Ну, достаточное решение ниже

def f_row(iter: Iterator[Row]): Iterator[Row] = {
  if (iter.hasNext) {
    val head = iter.next
    val r = Row.fromSeq(head.toSeq :+ head.getInt(head.fieldIndex("y")))
    iter.scanLeft(r)((r1, r2) => 
      Row.fromSeq(r2.toSeq :+ r1.getInt(r1.size - 1) * r2.getInt(r2.fieldIndex("y"))))
  } else iter
}
val encoder = 
  RowEncoder(StructType(df.schema.fields :+ StructField("s", IntegerType, false)))
df.repartition($"x").sortWithinPartitions($"y").mapPartitions(f_row)(encoder).show

Обновление

Можно избежать таких функций, как getInt, в пользу более общих getAs.

Кроме того, чтобы иметь возможность доступа к строкам r1 по имени, мы можем сгенерировать GenericRowWithSchema, который является подклассом Row.

Неявный параметр был добавлен в f_row, чтобы функция могла использовать текущую схему фрейма данных, и в то же время его можно использовать в качестве параметра mapPartitions.

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.catalyst.encoders.RowEncoder

implicit val schema = StructType(df.schema.fields :+ StructField("result", IntegerType))
implicit val encoder = RowEncoder(schema)

def mul(x1: Int, x2: Int) = x1 * x2;

def f_row(iter: Iterator[Row])(implicit currentSchema : StructType) : Iterator[Row] = {
  if (iter.hasNext) {
    val head = iter.next
    val r =
      new GenericRowWithSchema((head.toSeq :+ (head.getAs("y"))).toArray, currentSchema)

    iter.scanLeft(r)((r1, r2) =>
      new GenericRowWithSchema((r2.toSeq :+ mul(r1.getAs("result"), r2.getAs("y"))).toArray, currentSchema))
  } else iter
}

df.repartition($"x").sortWithinPartitions($"y").mapPartitions(f_row).show

Наконец, логика может быть реализована хвостовым рекурсивным способом.

import scala.annotation.tailrec

def f_row(iter: Iterator[Row]) = {
  @tailrec
  def f_row_(iter: Iterator[Row], tmp: Int, result: Iterator[Row]): Iterator[Row] = {
    if (iter.hasNext) {
      val r = iter.next
      f_row_(iter, mul(tmp, r.getAs("y")),
        result ++ Iterator(Row.fromSeq(r.toSeq :+ mul(tmp, r.getAs("y")))))
    } else result
  }
  f_row_(iter, 1, Iterator[Row]())
}

df.repartition($"x").sortWithinPartitions($"y").mapPartitions(f_row).show
...