import java.io.Serializable
import java.sql.Date
import org.apache.log4j.lf5.LogLevel
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{date_format, _}
import org.apache.spark.sql.types.{DoubleType, FloatType, IntegerType, StringType}
// Field name should be same as Elasticsearch attribute
case class CaseClass(id: String = "", name: Double = 0.0d, doj: Date = new Date(1999, 0, 0), sal: Double = 0) extends Serializable
object DataTypeConvertion extends SessionFactoryTest with Serializable {
import spark.implicits._
def main(args: Array[String]): Unit = {
val sc = spark.sparkContext
sc.setLogLevel(LogLevel.WARN.toString)
val df = sc.parallelize(Seq(
(101, "1", "2019-08-22T19:26:10.000+0000", "123.45"),
(102, "2", "2019-08-08T00:00:14.352", "456.78"),
(103, "3", "2019-08-08T00:01:22.161", "890.12"),
(101, "1", "2019-08-08T01:12:14.991", "1122.45"),
(101, "3", "2019-08-08T01:12:21.977", "221.33"),
(102, "1", "2019-08-08T01:13:09.331", "123.45"),
(102, "3", "2019-08-08T01:17:10.461", "890.12"),
(102, "2", "2019-08-08T01:17:15.152", "890.12"),
(102, "2", "2019-08-08T01:23:09.78", "123.45"),
(101, "1", "2019-08-08T01:24:10.977", "890.12"),
(101, "1", "2019-08-08T01:25:17.221", "123.45")
)).toDF("id", "name", "doj", "sal")
val res = changeColType(df, CaseClass.apply())
res.printSchema()
res.show(false)
}
// DF -> ES based on case class
def changeColType[T](df: DataFrame, classType: T): DataFrame = {
val res = df.dtypes.map(cols => (cols._1, classType.getClass.getDeclaredField(cols._1).getType.getName)) :+ ("", "")
val finalDF = res.foldLeft(res(0), df)((x, y) => {
val tmpDF = x._1._2 match {
case "int" => x._2.withColumn(x._1._1, col(x._1._1).cast(IntegerType))
case "java.lang.String" => x._2.withColumn(x._1._1, col(x._1._1).cast(StringType))
case "java.sql.Date" => x._2.withColumn(x._1._1, date_format(col(x._1._1), "yyyy-MM-dd'T'HH:mm:ss").as(x._1._1))
case "float" => x._2.withColumn(x._1._1, col(x._1._1).cast(FloatType))
case "double" => x._2.withColumn(x._1._1, col(x._1._1).cast(DoubleType))
case _ => x._2
}
(y, tmpDF)
})
finalDF._2
}
}
trait SessionFactoryTest {
val spark = SparkSession.builder()
.appName("SmartBattery ES Push")
.config("spark.driver.allowMultipleContexts", true)
.master("local[*]")
.getOrCreate()
}