Метод преобразования Spark Dataset с универсальным типом возврата не компилируется - PullRequest
0 голосов
/ 03 марта 2019

У меня есть следующие классы дел с чертой, они расширены от

package com.mypackage.spark.event

case class TypedEvent(id: String, timestamp: Long, `type`: String)

sealed trait Event{
  def id: String
  def timestamp: Long
}

case class CreationEvent(id: String, timestamp: Long) extends Event

case class DeleteEvent(id: String, timestamp: Long) extends Event

Мне нужно преобразовать набор данных указанного типа TypedEvent в другой тип набора данных, который расширяется от Event trait с использованием transform в классе набора данных с механизмом сопоставления с образцом, как показано ниже (я использую Spark 2.3.1):

import spark.implicits._

val jsonDF = spark.read.json(pathToJsonFile)
val typedEventsDS = jsonDF.select("id", "timestamp", "type").as[TypedEvent]

val eventTypes = Array("CreateEvent", "DeleteEvent" , ...)

eventTypes.foreach(eventType => {
  val result = typedEventsDS.filter($"type" <=> eventType)
.transform(featurize(spark, eventType)) // line 61
  /**
   * ...
   */
})

def featurize (spark: SparkSession, eventType: String): Dataset[TypedEvent] => Dataset[_ <: Event] = dataset => {

  import spark.implicits._

  eventType match {
    case "CreateEvent" => dataset.as[CreationEvent]
    case "DeleteEvent" => dataset.as[DeleteEvent]
    ...
  }
}

Метод featurize должен возвращать набор данных любого типа, который расширяет черту Event.

Но это не компилируется с ошибкой:

Error:(61, 12) no type parameters for method transform: (t:
org.apache.spark.sql.Dataset[com.mypackage.spark.event.TypedEvent] =>
org.apache.spark.sql.Dataset[U])org.apache.spark.sql.Dataset[U] exist so 
that it can be applied to arguments 
org.apache.spark.sql.Dataset[com.mypackage.spark.event.TypedEvent] =>
org.apache.spark.sql.Dataset[_ <: com.mypackage.spark.event.Event])
--- because ---
argument expression's type is not compatible with formal parameter type;

found:
org.apache.spark.sql.Dataset[com.mypackage.spark.event.TypedEvent] =>
org.apache.spark.sql.Dataset[_ <: com.mypackage.spark.event.Event]
required:
org.apache.spark.sql.Dataset[com.mypackage.spark.event.TypedEvent] =>
org.apache.spark.sql.Dataset[?U]
      .transform(featurize(spark, eventType))

Итак, я попытался добавить параметр типа в сам метод transfrom следующим образом:

.transform[_ <: Event](featurize(eventType)), но это привело к другой ошибке компиляции: Error:(61, 22) unbound wildcard type .transform[_ <: Event](featurize(spark, eventType))

Также попытался сделатьМетод featurize Generic:

  def featurize[T <: Event](spark: SparkSession, eventType: String): Dataset[TypedEvent] => Dataset[T] = 
dataset => { /* ... same ... */}

, но получил type mismatch в первом case предложении

У меня ничего не получилось, кроме удаления [_ <: ] из featurize метода и возврата точного типа(т.е. CreationEvent) Я действительно не знаю, что происходит с Scala Generics.Есть идеи?

...