Групповое объединение GroupBy + для набора данных с классом Case / Trait в ключе - PullRequest
0 голосов
/ 07 января 2020

Я пытаюсь реорганизовать некоторый код и поместить общий лог c в черту. Я в основном хочу обработать наборы данных, сгруппировать их по некоторому ключу и объединить:

import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{ Dataset, Encoder, Encoders, TypedColumn }

case class SomeKey(a: String, b: Boolean)

case class InputRow(
 SomeKey,
 v: Double
)

trait MyTrait {

  def processInputs: Dataset[InputRow]

  def groupAndAggregate(
    logs: Dataset[InputRow]
  ): Dataset[(SomeKey, Long)] = {
    import logs.sparkSession.implicits._

    logs
      .groupByKey(i => i.key)
      .agg(someAggFunc)

  }
  //Whatever agg function: here, it counts the number of v that are >= 0.5
  def someAggFunc: TypedColumn[InputRow, Long] =
    new Aggregator[
      /*input type*/ InputRow,
      /* "buffer" type */ Long,
      /* output type */ Long
    ] with Serializable {

      def zero = 0L

      def reduce(b: Long, a: InputRow) = {
        if (a.v >= 0.5)
          b + 1
        else
          b
      }

      def merge(b1: Long, b2: Long) =
        b1 + b2

      // map buffer to output type
      def finish(b: Long) = b
      def bufferEncoder: Encoder[Long] = Encoders.scalaLong
      def outputEncoder: Encoder[Long] = Encoders.scalaLong
    }.toColumn
}

все работает отлично: я могу создать экземпляр класса, который наследует от MyTrait, и переопределить способ обработки входных данных:

import spark.implicits._
case class MyTraitTest(testDf: DataFrame) extends MyTrait {
    override def processInputs: Dataset[InputRow] = {
      val ds = testDf
        .select(
          $"a",
          $"b",
          $"v",
        )
        .rdd
        .map(
          r =>
            InputRow(
              SomeKey(r.getAs[String]("a"), r.getAs[Boolean]("b")),
              r.getAs[Double]("v")
          )
        )
        .toDS
      ds
    }
val df: DataFrame = Seq(
 ("1", false, 0.40),
 ("1", false, 0.54),
 ("0", true, 0.85),
 ("1", true, 0.39)
).toDF("a", "b", "v")

val myTraitTest  = MyTraitTest(df)
val ds: Dataset[InputRow] = myTraitTest.processInputs
val res                   = myTraitTest.groupAndAggregate(ds)
res.show(false)

+----------+----------------------------------+
|key       |InputRow                          |
+----------+----------------------------------+
|[1, false]|1                                 |
|[0, true] |1                                 |
|[1, true] |0                                 |
+----------+----------------------------------+

Теперь проблема: я хочу, чтобы SomeKey был производным от более универсального c trait Key, потому что ключ не всегда будет иметь только два поля, поля не будут иметь одинаковый тип et c. Хотя это всегда будет простой кортеж из нескольких базовых c примитивных типов.

Поэтому я попытался сделать следующее:

trait Key extends Product
case class SomeKey(a: String, b: Boolean) extends Key
case class SomeOtherKey(x: Int, y: Boolean, z: String) extends Key

case class InputRow[T <: Key](
   key: T,
   v: Double
)

trait MyTrait[T <: Key] {

  def processInputs: Dataset[InputRow[T]]

  def groupAndAggregate(
    logs: Dataset[InputRow[T]]
  ): Dataset[(T, Long)] = {
    import logs.sparkSession.implicits._

    logs
      .groupByKey(i => i.key)
      .agg(someAggFunc)

  }

  def someAggFunc: TypedColumn[InputRow[T], Long] = {...}

Теперь я делаю:

case class MyTraitTest(testDf: DataFrame) extends MyTrait[SomeKey] {
    override def processInputs: Dataset[InputRow[SomeKey]] = {
      ...
    }

et c.

Но теперь я получаю сообщение об ошибке: Unable to find encoder for type T. An implicit Encoder[T] is needed to store T instances in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases. .groupByKey(i => i.key)

Я действительно не знаю, как обойти эту проблему, я много чего пытался без успеха , Извините за это довольно длинное описание, но, надеюсь, у вас есть все элементы, которые помогут мне понять ... спасибо!

1 Ответ

2 голосов
/ 10 января 2020

Spark должен иметь возможность неявно создавать кодировщик для типа продукта T, поэтому вам нужно помочь ему обойти стирание типа JVM и передать TypeTag для T в качестве неявного параметра вашего метода groupAndAggregate.

Рабочий пример:

import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{ DataFrame, Dataset, Encoders, TypedColumn }
import scala.reflect.runtime.universe.TypeTag

trait Key extends Product
case class SomeKey(a: String, b: Boolean) extends Key
case class SomeOtherKey(x: Int, y: Boolean, z: String) extends Key

case class InputRow[T <: Key](key: T, v: Double)

trait MyTrait[T <: Key] {

  def processInputs: Dataset[InputRow[T]]

  def groupAndAggregate(
    logs: Dataset[InputRow[T]]
  )(implicit tTypeTag: TypeTag[T]): Dataset[(T, Long)] = {
    import logs.sparkSession.implicits._

    logs
      .groupByKey(i => i.key)
      .agg(someAggFunc)
  }

  def someAggFunc: TypedColumn[InputRow[T], Long] =
    new Aggregator[InputRow[T], Long, Long] with Serializable {

      def reduce(b: Long, a: InputRow[T]) = b + (a.v * 100).toLong

      def merge(b1: Long, b2: Long) = b1 + b2

      def zero = 0L
      def finish(b: Long) = b      
      def bufferEncoder = Encoders.scalaLong
      def outputEncoder = Encoders.scalaLong
    }.toColumn
}

с классом переноса


case class MyTraitTest(testDf: DataFrame) extends MyTrait[SomeKey] {
    import testDf.sparkSession.implicits._
    import org.apache.spark.sql.functions.struct

    override def processInputs = testDf
        .select(struct($"a", $"b") as "key", $"v" )
        .as[InputRow[SomeKey]]
}

и выполнением теста

val df = Seq(
 ("1", false, 0.40),
 ("1", false, 0.54),
 ("0", true, 0.85),
 ("1", true, 0.39)
).toDF("a", "b", "v")

val myTraitTest  = MyTraitTest(df)
val ds = myTraitTest.processInputs
val res = myTraitTest.groupAndAggregate(ds)
res.show(false)

+----------+-----------------------------------------------+
|key       |$anon$1($line5460910223.$read$$iw$$iw$InputRow)|
+----------+-----------------------------------------------+
|[1, false]|94                                             |
|[1, true] |39                                             |
|[0, true] |85                                             |
+----------+-----------------------------------------------+
...