Это может быть глупый вопрос, но я боролся в течение довольно долгого времени.Это действительно похоже на этот вопрос , но я не смог применить его в своем коде (дуэт к шаблонам или быть функцией).
Я хочу передать flatMap (или карту) преобразовать функцию в аргумент функции и затем передать ее в стратегическую функцию, которая фактически вызывает метод df.rdd.flatMap.Я попытаюсь объяснить!
case class Order(id: String, totalValue: Double, freight: Double)
case class Product(id: String, price: Double)
... or any other case class, whatever one needs to transform a row into ...
Класс Entity:
class Entity(path: String) = {
...
def flatMap[T](mapFunction: (Row) => ArrayBuffer[T]): Entity = {
this.getStrategy.flatMap[T](mapFunction)
return this
}
def save(path: String): Unit = {
... write logic ...
}
}
У Entity могут быть разные стратегии для своих методов.EntityStrategy выглядит следующим образом:
abstract class EntityStrategy(private val entity: Entity,
private val spark: SparkSession) {
...
def flatMap[T](mapFunction: (Row) => ArrayBuffer[T])
def map[T](mapFunction: (Row) => T)
}
И один пример реализации EntityStrategy:
class SparkEntityStrategy(private val entity: Entity, private val spark: SparkSession)
extends EntityStrategy(entity, spark) {
...
override def map[T](mapFunction: Row => T): Unit = {
val rdd = this.getData.rdd.map(f = mapFunction)
this.dataFrame = this.spark.createDataFrame(rdd)
}
override def flatMap[T](mapFunction: (Row) => ArrayBuffer[T]): Unit = {
var rdd = this.getData.rdd.flatMap(f = mapFunction)
this.dataFrame = this.spark.createDataFrame(rdd)
}
}
Наконец, я хотел бы создать функцию flatMap / map и вызвать ее так:
def transformFlatMap(row: Row): ArrayBuffer[Order] = {
var orders = new ArrayBuffer[Order]
var _deliveries = row.getAs[Seq[Row]]("deliveries")
_deliveries.foreach(_delivery => {
var order = Order(
id = row.getAs[String]("id"),
totalValue = _delivery.getAs("totalAmount").asInstanceOf[Double])
orders += order
})
return orders
}
val entity = new Entity("path")
entity.flatMap[Order](transformFlatMap).save("path")
Конечно, это не работает.Я получаю сообщение об ошибке на SparkEntityStrategy:
Ошибка: (95, 35) Недоступен ClassTag для T val rdd = this.getData.rdd.map (f = mapFunction)
Я пытался добавить (implicit encoder: Encoder: T)
к методам сущностей и стратегий, но это было бесполезно.Вероятно, сделал что-то не так, поскольку я новичок в Scala.
Если я уберу буквы "Т" и передам реальный класс дела, все будет хорошо.