Передача класса case в аргументы функции - PullRequest
0 голосов
/ 03 декабря 2018

извините за простой вопрос.Я хочу передать класс case в аргумент функции, и я хочу использовать его дальше внутри функции.До сих пор я пробовал это с TypeTag и ClassTag, но по какой-то причине я не могу правильно использовать его или, может быть, я не смотрю в правильном месте.

Варианты использования - это что-то похожее наэто:

case class infoData(colA:Int,colB:String)
case class someOtherData(col1:String,col2:String,col3:Int)

def readCsv[T:???](path:String,passedCaseClass:???): Dataset[???] = {
  sqlContext
    .read
    .option("header", "true")
    .csv(path)
    .as[passedCaseClass]
}

Это будет называться примерно так:

val infoDf = readCsv("/src/main/info.csv",infoData)
val otherDf = readCsv("/src/main/someOtherData.csv",someOtherData)

Ответы [ 2 ]

0 голосов
/ 03 декабря 2018

Есть две вещи, на которые следует обратить внимание:

  1. имена классов должны быть в CamelCase, поэтому InfoData.
  2. После того, как вы связали тип сDataSet, это не DataFrame.DataFrame - это специальное имя для DataSet общего назначения Row.

Вам нужно убедиться, что у вашего предоставленного класса есть неявный экземпляр соответствующего Encoder в текущей области видимости.

case class InfoData(colA: Int, colB: String)

Encoder экземпляров для примитивных типов (Int, String и т. Д.) И case classes можно получить, импортировав spark.implicits._

def readCsv[T](path: String)(implicit encoder: Encoder: T): Dataset[T] = {
  spark
    .read
    .option("header", "true")
    .csv(path)
    .as[T]
}

Или, вы можете использовать контекстное ограничение,

def readCsv[T: Encoder[T]](path: String): Dataset[T] = {
  spark
    .read
    .option("header", "true")
    .csv(path)
    .as[T]
}

Теперь вы можете использовать его следующим образом,

val spark = ...

import spark.implicits._

def readCsv[T: Encoder[T]](path: String): Dataset[T] = {
  spark
    .read
    .option("header", "true")
    .csv(path)
    .as[T]
}

val infoDS = readCsv[InfoData]("/src/main/info.csv")
0 голосов
/ 03 декабря 2018

Сначала измените определение вашей функции на:

object t0 {
    def readCsv[T] (path: String)(implicit spark: SparkSession, encoder: Encoder[T]): Dataset[T] = {
      spark
        .read
        .option("header", "true")
        .csv(path)
        .as[T]
    }
}

Вам не нужно выполнять какое-либо отражение для создания универсальной функции readCsv.Ключевым моментом здесь является то, что Spark нужен кодер во время компиляции.Таким образом, вы можете передать его как неявный параметр, и компилятор добавит его.

Поскольку Spark SQL может десериализовать типы продуктов (ваши классы дел), включая кодировщики по умолчанию, вашу функцию легко вызвать как:

case class infoData(colA: Int, colB: String)
case class someOtherData(col1: String, col2: String, col3: Int)

object test {
  import t0._

  implicit val spark = SparkSession.builder().getOrCreate()

  import spark.implicits._
  readCsv[infoData]("/tmp")

}

Надеюсь, это поможет

...