Неявный класс не работает для Generi c Type - PullRequest
0 голосов
/ 01 февраля 2020

Я пытаюсь использовать Неявный класс для упрощения загрузки данных в таблицу улья (с помощью Spark)

Скажем, например: Если это мои данные,

case class Name(name: String, age: Int)
val lst = List(Name("name1", 24), Name("name2", 25))

Я хочу загрузить данные в таблицу следующим образом lst.loadToTable(db = "somedb", table = "sometable")

Вот что я сделал до сих пор, чтобы достичь вышеуказанного ^ используя Implicit class,

object TableHelper {

    implicit class TableUtil[X: ClassTag](lst: Seq[X]){

        def loadToTable(db: String, table: String)(implicit spark: SparkSession): Unit ={
            println("Table -> " + table)
            spark.catalog.tableExists(s"${db}.${table}") match {
                case true =>
                    println(s"Table exist. Not creating new one: $table")
                case false =>
                    println(s"Table DO NOT exist. Creating new one: $table")
                    // Create database
                    val ddl = s"CREATE DATABASE IF NOT EXISTS ${db}"
                    println("ddl -> " + ddl)
                    spark.sql(ddl)
                    // Create DataFrame
                    import spark.implicits._
                    val df = spark.sparkContext.parallelize(lst).toDF() // Compiler ERROR
                    // TODO: Pending: Create table & Load data
            }
        }

    }

}

Проблема Я получаю эту ошибку компилятора,

Error:(27, 66) value toDF is not a member of org.apache.spark.rdd.RDD[X]
                    val df = spark.sparkContext.parallelize(lst).toDF()

Кажется, проблема в том, что toDF() не работает при использовании типа generi c (где в моем коде lst равно Seq[X]). Любая идея, что нужно сделать, чтобы исправить эту проблему?

1 Ответ

1 голос
/ 03 февраля 2020

Наконец-то я могу исправить код (не уверен, почему мой вопрос был опущен. Будет гораздо больше смысла, если будет указана причина). Спасибо @Worakarn Isaratham за руководство. Код потребовал некоторых дополнительных изменений по сравнению с тем, что указано здесь

import org.apache.spark.sql.SparkSession
import scala.reflect.ClassTag
import scala.reflect.runtime.universe._

object TableHelper {
    // FIX is X <: Product : ClassTag : TypeTag    
    implicit class TableUtil[X <: Product : ClassTag : TypeTag](lst: Seq[X]){

        def loadToTable(db: String, table: String)(implicit spark: SparkSession): Unit ={
            println("Table -> " + table)
            spark.catalog.tableExists(s"${db}.${table}") match {
                case true =>
                    println(s"Table exist. Not creating new one: $table")
                    // TODO: Pending: Load data    
                case false =>
                    println(s"Table DO NOT exist. Creating new one: $table")
                    // Create database
                    val ddl = s"CREATE DATABASE IF NOT EXISTS ${db}"
                    println("ddl -> " + ddl)
                    spark.sql(ddl)
                    // Create DataFrame
                    import spark.implicits._
                    val df = spark.sparkContext.parallelize(lst).toDF()
                    df.show(100, false)
                // TODO: Pending: Create table & Load data
            }
        }

    }

}

[X <: Product : ClassTag : TypeTag] сделали свое дело. ClassTag необходимо для parallelize method

  def parallelize[T: ClassTag](
      seq: Seq[T],
      numSlices: Int = defaultParallelism): RDD[T] = withScope {

X <: Product, вероятно, говорит ... тип Generi c, который я собираюсь использовать, реализует Product признак (Все case class es реализует Product )

TypeTag необходим для метода toDF, чтобы предоставить доступ к стертым типам (во время выполнения)

... и теперь Я могу упростить соглашение о вызовах следующим образом

val lst = List(Name("name1", 24), Name("name2", 25))
lst.loadToTable("somedb", "sometable")

и вот результат

+-----+---+
|name |age|
+-----+---+
|name1|24 |
|name2|25 |
+-----+---+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...