Как проверить, совпадают ли имена столбцов и связанные с ними данные в spark scala - PullRequest
0 голосов
/ 27 февраля 2020

Предположим, у меня есть несколько столбцов, как показано ниже:

EMP_ID, EMP_NAME, EMP_CONTACT  
1, SIDDHESH, 544949461

Теперь я хочу проверить, синхронизируются ли данные c со схемой имен столбцов. Для EMP_NAME данные в этом столбце должны быть только string. Я попробовал приведенный ниже код после ссылки на эту ссылку, но она показывает ошибку в последней строке моего кода.

package com.sample
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Row
class sample1 {
  val spark = SparkSession.builder().master("local[*]").getOrCreate()
  val data = spark.read.format("csv").option("header", "true").load("C:/Users/siddheshk2/Desktop/words.txt")
  val originalSchema = data.schema

  def validateColumns(row: Row): Row = {
    val emp_id = row.getAs[String]("EMP_ID")
    val emp_name = row.getAs[String]("EMP_NAME")
    val emp_contact = row.getAs[String]("EMP_CONTACT")

    // do checking here and populate (err_col,err_val,err_desc) with values if applicable

    Row.merge(row)
  }
  val validateDF = data.map { row => validateColumns(row) }

}  

Итак, он не принимает последнюю строку моего кода val validateDF = data.map { row => validateColumns(row) }. Как мне это решить? Или есть какой-либо другой эффективный способ решения моей проблемы?

Я ввел недопустимую запись (3-ю), как показано ниже:

EMP_ID,EMP_NAME,EMP_CONTACT
1,SIDDHESH,99009809
2,asdadsa, null
sidh,sidjh,1232  

В этом случае я ввел string значение для столбца id (которое должно быть числом), поэтому после проверки схемы столбца с ее данными должно выдаваться сообщение об ошибке, указывающее, что запись не соответствует схеме столбца.

Ответы [ 2 ]

1 голос
/ 27 февраля 2020

Класс строки имеет свойство схема . Вы можете использовать его, просматривая столбцы и сравнивая их. Для этого вы можете использовать оператор == или один из методов сравнения схем, описанных здесь .

Тогда метод проверки может выглядеть следующим образом:

def isValid(row: Row): Boolean = originalSchema == row.schema
1 голос
/ 27 февраля 2020

Вы только что пропустили преобразование вашего DataFrame в rdd, чтобы применить операцию .map, попробуйте вместо этого:

import org.apache.spark.sql.Row    
val validateDF = data.rdd.map { row => validateColumns(row) }

И если вы хотите преобразовать его обратно в DataFrame, просто используйте для этого sparkSession :

val newSchema = // specify the schema of the new dataframe
val updatedDF = spark.createDataFrame(validateDF, newSchema)
...