Хотите создать универсальный метод утилит c, который преобразует тип Dataframe / Dataset на основе класса case - PullRequest
0 голосов
/ 04 февраля 2020

Хотите создать универсальный c служебный метод, который будет отвечать за изменение типа DataFrame (искры) в зависимости от дизайна класса дела. Этот сценарий поможет при переносе DataFrame в Elasticsearch, поскольку атрибут и тип Elasticsearch должны совпадать с именем и типом столбца DataFrame, иначе он не сможет ввести * ES * 1005 в ES. и ниже это решение.

1 Ответ

0 голосов
/ 04 февраля 2020
import java.io.Serializable
import java.sql.Date
import org.apache.log4j.lf5.LogLevel
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{date_format, _}
import org.apache.spark.sql.types.{DoubleType, FloatType, IntegerType, StringType}

// Field name should be same as Elasticsearch attribute
case class CaseClass(id: String = "", name: Double = 0.0d, doj: Date = new Date(1999, 0, 0), sal: Double = 0) extends Serializable

object DataTypeConvertion extends SessionFactoryTest with Serializable {

  import spark.implicits._

  def main(args: Array[String]): Unit = {
    val sc = spark.sparkContext
    sc.setLogLevel(LogLevel.WARN.toString)
    val df = sc.parallelize(Seq(
      (101, "1", "2019-08-22T19:26:10.000+0000", "123.45"),
      (102, "2", "2019-08-08T00:00:14.352", "456.78"),
      (103, "3", "2019-08-08T00:01:22.161", "890.12"),
      (101, "1", "2019-08-08T01:12:14.991", "1122.45"),
      (101, "3", "2019-08-08T01:12:21.977", "221.33"),
      (102, "1", "2019-08-08T01:13:09.331", "123.45"),
      (102, "3", "2019-08-08T01:17:10.461", "890.12"),
      (102, "2", "2019-08-08T01:17:15.152", "890.12"),
      (102, "2", "2019-08-08T01:23:09.78", "123.45"),
      (101, "1", "2019-08-08T01:24:10.977", "890.12"),
      (101, "1", "2019-08-08T01:25:17.221", "123.45")
    )).toDF("id", "name", "doj", "sal")

    val res = changeColType(df, CaseClass.apply())
    res.printSchema()
    res.show(false)
  }

  // DF  -> ES based on case class
  def changeColType[T](df: DataFrame, classType: T): DataFrame = {
    val res = df.dtypes.map(cols => (cols._1, classType.getClass.getDeclaredField(cols._1).getType.getName)) :+ ("", "")
    val finalDF = res.foldLeft(res(0), df)((x, y) => {
      val tmpDF = x._1._2 match {
        case "int" => x._2.withColumn(x._1._1, col(x._1._1).cast(IntegerType))
        case "java.lang.String" => x._2.withColumn(x._1._1, col(x._1._1).cast(StringType))
        case "java.sql.Date" => x._2.withColumn(x._1._1, date_format(col(x._1._1), "yyyy-MM-dd'T'HH:mm:ss").as(x._1._1))
        case "float" => x._2.withColumn(x._1._1, col(x._1._1).cast(FloatType))
        case "double" => x._2.withColumn(x._1._1, col(x._1._1).cast(DoubleType))
        case _ => x._2
      }
      (y, tmpDF)
    })
    finalDF._2
  }
}

trait SessionFactoryTest  {
  val spark = SparkSession.builder()
    .appName("SmartBattery ES Push")
    .config("spark.driver.allowMultipleContexts", true)
    .master("local[*]")
    .getOrCreate()
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...