Я читаю CSV-файл как DF, в котором последние 3 обязательных столбца содержат значения NULL CSV-файл
Может кто-нибудь подсказать, как использовать оператор if else в UDF в scala загорается таким образом, что, если в этих столбцах есть какие-либо нулевые значения, он должен выдать ошибку, указывающую, что «Обязательные поля не могут быть пустыми»? Я написал код в scala spark, поэтому будет полезно, если я получу какие-либо предложения.
Это мой первый код, поэтому, пожалуйста, прости мои ошибки. Из приведенного ниже кода, пожалуйста, объясните мне, как получить подробности таким образом, чтобы, если какое-либо из условий IF не удовлетворяло, задание не выполнялось, и журналы регистрировались с сообщением об ошибке, приведенным в выражении ELSE, если условия удовлетворяют результирующий DF должен быть вставлен в базу данных. Пожалуйста, поделитесь вашими предложениями:
import java.util.Date
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.functions._
object InputValidation {
val conf: SparkConf = new SparkConf()
.setAppName("Excel to DataFrame")
.setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val spark: SparkSession = SparkSession.builder()
.appName("Excel to DataFrame")
.config("spark.master", "local")
.getOrCreate()
val structType: StructType = {
val sno = StructField("S.No", IntegerType, nullable = true)
val fname = StructField("Firm Name", StringType, nullable = true)
val address = StructField("Address", StringType, nullable = true)
val country = StructField("Country", StringType, nullable = true)
val pcode = StructField("Post Code", IntegerType, nullable = true)
val tnumber = StructField("Telephone Number", IntegerType, nullable = true)
val waddress = StructField("Web Address", StringType, nullable = true)
val mail = StructField("Mail ID", StringType, nullable = true)
val fstatus = StructField("Firm Status", StringType, nullable = false)
val btype = StructField("Business Type", StringType, nullable = false)
val edate = StructField("Effective Date", DateType, nullable = false)
new StructType(Array(sno, fname, address, country, pcode, tnumber, waddress, mail,
fstatus, btype, edate))
}
def main(args: Array[String]): Unit = {
val inputDF: DataFrame = spark.read
.schema(structType)
.option("header", "true")
.option("delimiter", ",")
.csv("G:\\CSV\\FirmRegistration.csv")
def isValidUDF: Object = udf({
(fstatus: String, btype: String, edate: Date) => {
val validfs = Seq("New", "Authorised", "EEA Authorised", "Cancelled")
if (validfs.contains(fstatus)) {
return fstatus
}
else {
throw new Exception("Incorrect firm status")
}
val validbt = Seq("Regulated", "PSD", "EEA")
if (validbt.contains(btype)) {
return btype
}
else {
throw new Exception("Incorrect firm business type")
}
if (edate != null) {
return edate
}
else {
throw new Exception("Effective date cannot be NULL")
}
}
})
val userDF = udf(isValidUDF _)
val resultDF = inputDF.withColumn("IsValid", userDF())
resultDF.show()
//Load the result as a table into Database
val driver = "org.postgresql.Driver"
val url = "jdbc:postgressql://localhost:5432/rtjvm"
val user = "docker"
val password = "docker"
inputDF.write
.format("jdbc")
.option("driver",driver)
.option("url",url)
.option("user",user)
.option("password",password)
.option("dbtable","public.input")
.save()
}
}
Если я запускаю вышеуказанную программу, я получаю сообщение об ошибке: Исключение в потоке "main" java .lang.UnsupportedOperationException: схема для типа Object не поддерживается