Как обнаружить изменение в схеме файла CSV в Spark - PullRequest
0 голосов
/ 25 апреля 2020

Если в моем входящем CSV-файле есть изменения, как мы можем справиться с этим в spark?

Предположим, в первый день я получил CSV-файл со схемой и данными, как показано ниже,

FirstName LastName Age
Sagar     Patro    26
Akash     Nayak    22
Amar      Kumar    18

И на 10-й день моя схема входящего CSV-файла изменилась, как показано ниже

FirstName LastName Mobile     Age 
Sagar     Patro    8984159475 26  
Akash     Nayak    9040988503 22  
Amar      Kumar    9337856871 18  

Мои требования № 1,

Я хочу чтобы знать, есть ли какие-либо изменения в схеме моего входящего CSV-файла.

Мои требования № 2,

Я хочу игнорировать эти новые добавленные столбцы и продолжить с моей предыдущей схемой, т.е. данными схемы Day-1.

Мои требования № 3,

Я также хочу автоматически добавить новую схему, если в схеме произошли изменения к моим входящим данным csv, т.е. схема 10-го дня

1 Ответ

0 голосов
/ 25 апреля 2020
import org.apache.spark.sql.DataFrame

object SchemaDiff {

  def main(args: Array[String]): Unit = {
    // Just because its a simple CSV not considering column data type changes
    val df1 : DataFrame = null // Dataframe for yesterday's data
    val df2 : DataFrame = null // Dataframe for today's data
    val deltaColumnNames = df2.columns.diff(df1.columns)
    val ignoreSchemaChange = true
    if(!deltaColumnNames.isEmpty) {
      println("Schema change")
    }
    val resultDf = if(ignoreSchemaChange) {
      df2.toDF(df1.columns: _*) // Maintain yesterday's schema
    } else {
      df2  // Use updated schema
    }

  }
}

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...