Я пытаюсь объединить заголовок в один выходной файл как csv ( ref от @Kang)
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField, StringType, StructType}
object ListOfSavingFiltered {
def merge(srcPath: String, dstPath: String): Unit = {
val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)
FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), false, hadoopConfig, null)
// the "true" setting deletes the source files once they are merged into the new output
def main(args: Array[String]): Unit = {
val url = "jdbc:sqlserver://localhost;databaseName=InsightWarehouse;integratedSecurity=true";
val driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
val v_Account = "dbo.v_Account"
val v_Customer = "dbo.v_Customer"
val spark = SparkSession.
//.config("spark.debug.maxToStringFields", "100")
.appName("Insight Application Big Data")
val dfAccount = spark
.option("url", url)
.option("driver", driver)
.option("dbtable", v_Account)
val dfCustomer = spark
.option("url", url)
.option("driver", driver)
.option("dbtable", v_Customer)
val Classification = Seq("Contractual Account", "Non-Term Deposit", "Term Deposit")
val joined = dfAccount.as("a")
Seq("BusinessDate", "CustomerID"), "LEFT")
dfAccount.col("BusinessDate") === "2018-11-28"
&& dfAccount.col("Category") === "Deposit"
// && dfAccount.col("IsActive").equalTo("Yes")
&& dfAccount.col("Classification").isin(Classification: _*)
val columnNames = Seq[String](
val outputfile = "src/main/resources/out/"
var filename = "lifOfSaving.csv.gz"
var outputFileName = outputfile + "/temp_" + filename
var mergedFileName = outputfile + "/merged_" + filename
var mergeFindGlob = outputFileName
val responseWithSelectedColumns = joined.select(columnNames.map(c => col(c)): _*)
.withColumn("RollOverStatus", when(col("RollOverStatus").equalTo("Y"), "Yes").otherwise("No"))
//create a new data frame containing only header names
import scala.collection.JavaConverters._
val headerDF = spark.createDataFrame(List(Row.fromSeq(responseWithSelectedColumns.columns.toSeq)).asJava, responseWithSelectedColumns.schema)
//merge header names with data
// .coalesce(1) //So just a single part- file will be created
.option("codec", "org.apache.hadoop.io.compress.GzipCodec")
.option("charset", "UTF8")
.option("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false") //Avoid creating of crc files
.option("header", "false") //Write the header
merge(mergeFindGlob, mergedFileName)
Код кажется правильным, но все равно выдается сообщение об ошибке, как показано ниже:
Exception in thread "main" java.lang.ClassCastException: java.lang.String cannot be cast to java.sql.Date
at org.apache.spark.sql.catalyst.CatalystTypeConverters$DateConverter$.toCatalystImpl(CatalystTypeConverters.scala:300)
Кто-нибудь, пожалуйста, помогите?