Пользовательский агрегатор Spark, возвращающий строку - PullRequest
2 голосов
/ 01 августа 2020

Я пытаюсь изменить пример в https://medium.com/build-and-learn/spark-aggregating-your-data-the-fast-way-e37b53314fad для работы с произвольной строкой. Цель состоит в том, чтобы вернуть «последнюю» строку группы.

Агрегатор реализован таким образом

class Latest(val f: Row => String, val schema: StructType) extends Aggregator[Row, (String, Row), Row] {
  override def zero: (String, Row) = ("0000-00-00", null)
  override def reduce(b: (String, Row), a: Row): (String, Row) = merge(b, (f(a), a))
  override def merge(b1: (String, Row), b2: (String, Row)): (String, Row) = Seq(b1, b2).maxBy(_._1)
  override def finish(reduction: (String, Row)): Row = reduction._2

  override def bufferEncoder: Encoder[(String, Row)] = Encoders.product[(String, Row)]
  override def outputEncoder: Encoder[Row] = RowEncoder(schema)
}

Я тестирую этот агрегатор с помощью следующего кода

class AggregatorSpec
    extends FunSpec
    with DataFrameComparer
    with SparkSessionTestWrapper {

  import spark.implicits._

  describe("main") {

    it("works") {

        val spark = SparkSession
          .builder
          .master("local")
          .appName("common typed aggregator implementations")
          .getOrCreate()

        val df = Seq(
          ("ham", "2019-01-01", 3L, "Yah"),
          ("cheese", "2018-12-31", 4L, "Woo"),       
          ("fish", "2019-01-02", 5L, "Hah"),
          ("grain", "2019-01-01", 6L, "Community"),
          ("grain", "2019-01-02", 7L, "Community"),
          ("ham", "2019-01-04", 3L, "jamón")
        ).toDF("Key", "Date", "Numeric", "Text")

        println("input data:")
        df.show()

        println("running latest:")
        df.groupByKey(_.getString(0)).agg(new Latest(_.getString(1), ds.schema).toColumn).show()

        spark.stop()
    }
  }
}

Выполнение приведенного выше кода приводит к следующей ошибке:

[info] - runs *** FAILED ***
[info]   java.lang.UnsupportedOperationException: No Encoder found for org.apache.spark.sql.Row
[info] - field (class: "org.apache.spark.sql.Row", name: "_2")
[info] - root class: "scala.Tuple2"
[info]   at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:625)
[info]   at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$10.apply(ScalaReflection.scala:619)
[info]   at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$10.apply(ScalaReflection.scala:607)
[info]   at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
[info]   at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
[info]   at scala.collection.immutable.List.foreach(List.scala:381)
[info]   at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
[info]   at scala.collection.immutable.List.flatMap(List.scala:344)
[info]   at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:607)
[info]   at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:438)

Я относительно новичок как в Spark, так и в Scala, и я даже не уверен, что это вообще возможно, чего я пытаюсь достичь.

1 Ответ

2 голосов
/ 01 августа 2020

Проблема в создании bufferEncoder - Измените на это

override def bufferEncoder: Encoder[(String, Row)] = Encoders.tuple(Encoders.STRING, RowEncoder(schema))

Я надеюсь, что это наивный пример, и вы хотели попробовать Aggregator. Если нет, есть альтернатива для достижения того же результата без агрегатора

 df.groupBy("Key").agg(max(struct("Date", "Numeric", "Text", "key")))
      .show(false)
...