У меня есть следующие классы дел с чертой, они расширены от
package com.mypackage.spark.event
case class TypedEvent(id: String, timestamp: Long, `type`: String)
sealed trait Event{
def id: String
def timestamp: Long
}
case class CreationEvent(id: String, timestamp: Long) extends Event
case class DeleteEvent(id: String, timestamp: Long) extends Event
Мне нужно преобразовать набор данных указанного типа TypedEvent
в другой тип набора данных, который расширяется от Event trait
с использованием transform в классе набора данных с механизмом сопоставления с образцом, как показано ниже (я использую Spark 2.3.1):
import spark.implicits._
val jsonDF = spark.read.json(pathToJsonFile)
val typedEventsDS = jsonDF.select("id", "timestamp", "type").as[TypedEvent]
val eventTypes = Array("CreateEvent", "DeleteEvent" , ...)
eventTypes.foreach(eventType => {
val result = typedEventsDS.filter($"type" <=> eventType)
.transform(featurize(spark, eventType)) // line 61
/**
* ...
*/
})
def featurize (spark: SparkSession, eventType: String): Dataset[TypedEvent] => Dataset[_ <: Event] = dataset => {
import spark.implicits._
eventType match {
case "CreateEvent" => dataset.as[CreationEvent]
case "DeleteEvent" => dataset.as[DeleteEvent]
...
}
}
Метод featurize
должен возвращать набор данных любого типа, который расширяет черту Event.
Но это не компилируется с ошибкой:
Error:(61, 12) no type parameters for method transform: (t:
org.apache.spark.sql.Dataset[com.mypackage.spark.event.TypedEvent] =>
org.apache.spark.sql.Dataset[U])org.apache.spark.sql.Dataset[U] exist so
that it can be applied to arguments
org.apache.spark.sql.Dataset[com.mypackage.spark.event.TypedEvent] =>
org.apache.spark.sql.Dataset[_ <: com.mypackage.spark.event.Event])
--- because ---
argument expression's type is not compatible with formal parameter type;
found:
org.apache.spark.sql.Dataset[com.mypackage.spark.event.TypedEvent] =>
org.apache.spark.sql.Dataset[_ <: com.mypackage.spark.event.Event]
required:
org.apache.spark.sql.Dataset[com.mypackage.spark.event.TypedEvent] =>
org.apache.spark.sql.Dataset[?U]
.transform(featurize(spark, eventType))
Итак, я попытался добавить параметр типа в сам метод transfrom
следующим образом:
.transform[_ <: Event](featurize(eventType))
, но это привело к другой ошибке компиляции: Error:(61, 22) unbound wildcard type .transform[_ <: Event](featurize(spark, eventType))
Также попытался сделатьМетод featurize Generic:
def featurize[T <: Event](spark: SparkSession, eventType: String): Dataset[TypedEvent] => Dataset[T] =
dataset => { /* ... same ... */}
, но получил type mismatch
в первом case
предложении
У меня ничего не получилось, кроме удаления [_ <: ]
из featurize
метода и возврата точного типа(т.е. CreationEvent) Я действительно не знаю, что происходит с Scala Generics.Есть идеи?