Передать функцию с любым типом возвращаемого класса case в качестве параметра - PullRequest
0 голосов
/ 22 февраля 2019

Это может быть глупый вопрос, но я боролся в течение довольно долгого времени.Это действительно похоже на этот вопрос , но я не смог применить его в своем коде (дуэт к шаблонам или быть функцией).

Я хочу передать flatMap (или карту) преобразовать функцию в аргумент функции и затем передать ее в стратегическую функцию, которая фактически вызывает метод df.rdd.flatMap.Я попытаюсь объяснить!

case class Order(id: String, totalValue: Double, freight: Double) 
case class Product(id: String, price: Double) 

... or any other case class, whatever one needs to transform a row into ...

Класс Entity:

class Entity(path: String) = {
  ...
  def flatMap[T](mapFunction: (Row) => ArrayBuffer[T]): Entity = {
      this.getStrategy.flatMap[T](mapFunction)
      return this
  }
  def save(path: String): Unit = {
      ... write logic ...
  } 
}

У Entity могут быть разные стратегии для своих методов.EntityStrategy выглядит следующим образом:

abstract class EntityStrategy(private val entity: Entity,
                              private val spark: SparkSession) {
  ...
  def flatMap[T](mapFunction: (Row) => ArrayBuffer[T])
  def map[T](mapFunction: (Row) => T)
}

И один пример реализации EntityStrategy:

class SparkEntityStrategy(private val entity: Entity, private val spark: SparkSession)
  extends EntityStrategy(entity, spark) {
  ...
  override def map[T](mapFunction: Row => T): Unit = {
    val rdd = this.getData.rdd.map(f = mapFunction)
    this.dataFrame = this.spark.createDataFrame(rdd)
  }

  override def flatMap[T](mapFunction: (Row) => ArrayBuffer[T]): Unit = {
    var rdd = this.getData.rdd.flatMap(f = mapFunction)
    this.dataFrame = this.spark.createDataFrame(rdd)
  }
}

Наконец, я хотел бы создать функцию flatMap / map и вызвать ее так:

def transformFlatMap(row: Row): ArrayBuffer[Order] = {
    var orders = new ArrayBuffer[Order]
    var _deliveries = row.getAs[Seq[Row]]("deliveries")
    _deliveries.foreach(_delivery => {
       var order = Order(
           id = row.getAs[String]("id"),
           totalValue = _delivery.getAs("totalAmount").asInstanceOf[Double])
      orders += order
    })
   return orders
}

val entity = new Entity("path")
entity.flatMap[Order](transformFlatMap).save("path")

Конечно, это не работает.Я получаю сообщение об ошибке на SparkEntityStrategy:

Ошибка: (95, 35) Недоступен ClassTag для T val rdd = this.getData.rdd.map (f = mapFunction)

Я пытался добавить (implicit encoder: Encoder: T) к методам сущностей и стратегий, но это было бесполезно.Вероятно, сделал что-то не так, поскольку я новичок в Scala.

Если я уберу буквы "Т" и передам реальный класс дела, все будет хорошо.

1 Ответ

0 голосов
/ 23 февраля 2019

Получается, чтобы и методы компилятора, и Spark были удовлетворены. Мне нужно было добавить следующие теги типа:

[T <: scala.Product : ClassTag : TypeTag]

Итак, оба метода стали:

def map[T <: Product : ClassTag : TypeTag](mapFunction: (Row) => T): Entity
def flatMap[T <: scala.Product : ClassTag : TypeTag](mapFunction: (Row) => TraversableOnce[T]): Entity

О scala.Product :

Базовая черта для всех продуктов, которые в стандартной библиотеке включают как минимум scala.Product1 через scala.Product22 и, следовательно, также их подклассы scala.Tuple1 - scala.Tuple22.Кроме того, все классы case реализуют Product с помощью синтетически сгенерированных методов.

Поскольку я использую объект класса case в качестве возвращаемого типа моей функции, мне потребовался scala.Product , чтобы Spark createDataFrame мог соответствовать правильной перегрузке.

Почему оба ClassTag и TypeTag ?

При удалении TypeTag компилятор выдает следующую ошибку:

Ошибка: (96,48) Тип Tag недоступен для T this.dataFrame = this.spark.createDataFrame (rdd)

И удаление ClassTag :

Ошибка: (95, 35) Недоступен ClassTag для T val rdd = this.getData.rdd.map (f = mapFunction)

При их добавлении оба метода были удовлетворены, и все работало какожидается.

Найдена хорошая статья , объясняющая стирание типа в Scala.

...