Spark: загрузка CSV с разными столбцами - PullRequest
0 голосов
/ 26 марта 2019

Я загружаю файл CSV, используя загрузчик CSV Spark, и преобразую его в конкретную Dataset, предоставляя схему case class и используя .as[T].

spark.read
  .option("header", "false")
  .option("dateFormat", "yyyy-MM-dd HH:mm:ss.SSS")
  .schema(schemaOf[T])
  .csv(filePath)
  .as[T]

Мой вопрос здесь, у меня есть несколько систем, отправляющих один и тот же файл, и скажем, если одна система отправляет файл, содержащий менее двух столбцов из моего определенного schema тогда я хотел бы просто поставить null для этих двух столбцов и загрузить все остальные столбцы.

И для всех других систем загружать все поля при отправке в соответствии с schema.

Как мне сделать это эффективным способом? Я не хочу создавать case class для каждой системы.

1 Ответ

2 голосов
/ 26 марта 2019

Вы можете обработать данные в формате csv как Dataframe, а затем преобразовать в Dataset. Таким образом, вы можете легко добавлять / удалять столбцы в соответствии с вашим классом дел с помощью таких служебных функций, как:

implicit class DataFrameOps(df: DataFrame) {
  def withColumnIfNotExists(colName: String, col: Column): DataFrame = {
    if(df.columns.contains(colName)) df
    else df.withColumn(colName, col)
  }
}

// then use it like this
???.csv(filePath).withColumnIfNotExists("missing_col", lit(null).cast("string"))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...