Как использовать оператор if для выдачи ошибки, если значение в столбце dataframe равно NULL? - PullRequest
0 голосов
/ 11 февраля 2020

Я читаю CSV-файл как DF, в котором последние 3 обязательных столбца содержат значения NULL CSV-файл

Может кто-нибудь подсказать, как использовать оператор if else в UDF в scala загорается таким образом, что, если в этих столбцах есть какие-либо нулевые значения, он должен выдать ошибку, указывающую, что «Обязательные поля не могут быть пустыми»? Я написал код в scala spark, поэтому будет полезно, если я получу какие-либо предложения.

Это мой первый код, поэтому, пожалуйста, прости мои ошибки. Из приведенного ниже кода, пожалуйста, объясните мне, как получить подробности таким образом, чтобы, если какое-либо из условий IF не удовлетворяло, задание не выполнялось, и журналы регистрировались с сообщением об ошибке, приведенным в выражении ELSE, если условия удовлетворяют результирующий DF должен быть вставлен в базу данных. Пожалуйста, поделитесь вашими предложениями:

import java.util.Date
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.functions._


object InputValidation {

  val conf: SparkConf = new SparkConf()
    .setAppName("Excel to DataFrame")
    .setMaster("local[*]")

  val sc = new SparkContext(conf)
  sc.setLogLevel("WARN")

  val spark: SparkSession = SparkSession.builder()
    .appName("Excel to DataFrame")
    .config("spark.master", "local")
    .getOrCreate()




 val structType: StructType = {
    val sno = StructField("S.No", IntegerType, nullable = true)
    val fname = StructField("Firm Name", StringType, nullable = true)
    val address = StructField("Address", StringType, nullable = true)
    val country = StructField("Country", StringType, nullable = true)
    val pcode = StructField("Post Code", IntegerType, nullable = true)
    val tnumber = StructField("Telephone Number", IntegerType, nullable = true)
    val waddress = StructField("Web Address", StringType, nullable = true)
    val mail = StructField("Mail ID", StringType, nullable = true)
    val fstatus = StructField("Firm Status", StringType, nullable = false)
    val btype = StructField("Business Type", StringType, nullable = false)
    val edate = StructField("Effective Date", DateType, nullable = false)
    new StructType(Array(sno, fname, address, country, pcode, tnumber, waddress, mail,
      fstatus, btype, edate))
  }
  def main(args: Array[String]): Unit = {
  val inputDF: DataFrame = spark.read
      .schema(structType)
      .option("header", "true")
      .option("delimiter", ",")
      .csv("G:\\CSV\\FirmRegistration.csv")


def isValidUDF: Object = udf({
  (fstatus: String, btype: String, edate: Date) => {
    val validfs = Seq("New", "Authorised", "EEA Authorised", "Cancelled")
    if (validfs.contains(fstatus)) {
      return fstatus
    }
    else {
      throw new Exception("Incorrect firm status")
    }

    val validbt = Seq("Regulated", "PSD", "EEA")
    if (validbt.contains(btype)) {
      return btype
    }
    else {
      throw new Exception("Incorrect firm business type")
    }

    if (edate != null) {
      return edate
    }
    else {
      throw new Exception("Effective date cannot be NULL")
    }
  }
})

  val userDF = udf(isValidUDF _)
  val resultDF = inputDF.withColumn("IsValid", userDF())
  resultDF.show()

//Load the result as a table into Database
    val driver = "org.postgresql.Driver"
    val url = "jdbc:postgressql://localhost:5432/rtjvm"
    val user = "docker"
    val password = "docker"

    inputDF.write
      .format("jdbc")
      .option("driver",driver)
      .option("url",url)
      .option("user",user)
      .option("password",password)
      .option("dbtable","public.input")
      .save()

}

}

Если я запускаю вышеуказанную программу, я получаю сообщение об ошибке: Исключение в потоке "main" java .lang.UnsupportedOperationException: схема для типа Object не поддерживается

1 Ответ

0 голосов
/ 11 февраля 2020
def checkNullUDF = udf({
  (firmStatus: String, businessType: String, effectiveDate: String) => {
    if(firmStatus == null || businessType == null || effectiveDate == null) true else false
  }
})

df.withColumn("IsNull", checkNullUDF($"Firm Status", $"Business Type", $"Effective Date")

Это даст вам столбец «IsNull» со ​​значениями true / false, который вы можете использовать для соответствующего исключения.

...