Как проверить схему DataFrame? - PullRequest
0 голосов
/ 11 октября 2018

У меня есть DataFrame df с некоторыми данными, которые являются результатом процесса расчета.Затем я сохраняю этот DataFrame в базе данных для дальнейшего использования.

Например:

val rowsRDD: RDD[Row] = sc.parallelize(
  Seq(
    Row("first", 2.0, 7.0),
    Row("second", 3.5, 2.5),
    Row("third", 7.0, 5.9)
  )
)

val schema = new StructType()
  .add(StructField("id", StringType, true))
  .add(StructField("val1", DoubleType, true))
  .add(StructField("val2", DoubleType, true))

val df = spark.createDataFrame(rowsRDD, schema)

Мне нужно проверить, что все столбцы в окончательном DataFrame соответствуют определенным типам данных.Конечно, одним из способов является создание DataFrame с использованием схемы (как в приведенном выше примере).Однако в некоторых случаях изменения могут иногда вноситься в типы данных в процессе вычисления - после создания исходного DataFrame (например, когда была изменена некоторая формула, примененная к DataFrame).

Поэтому я хочуперепроверить, что final DataFrame соответствует исходной схеме.Если это не соответствует, то я хотел бы применить соответствующий кастинг.Есть ли способ сделать это?

Ответы [ 4 ]

0 голосов
/ 08 марта 2019

На основе операций с нетипизированными наборами данных из https://spark.apache.org/docs/2.2.0/sql-programming-guide.html, должно быть:

df.printSchema()

0 голосов
/ 11 октября 2018

Вы можете получить схему информационного кадра с помощью метода схемы

df.schema

Определите метод castColumn

def castColumn(df: DataFrame, colName: String, randomDataType: DataType): DataFrame =
    df.withColumn(colName, df.col(colName).cast(randomDataType))

Затем примените этот метод ко всем столбцам, которые нужно преобразовать.

Сначала получите массив кортежей с colName и целевым типом данных

//Assume your dataframes have the same column names, you need to sortBy in case the it is not in the same order

// You can also iterate through dfOrigin.schema only and compare their dataTypes with target dataTypes instead of zipping

val differences = (dfOrigin.schema.fields.sortBy{case (x: StructField) => x.name} zip dfTarget.schema.fields.sortBy{case (x: StructField) => x.name}).collect {
                   case (origin: StructField, target: StructField) if origin.dataType != target.dataType => 
                        (origin.name, target.dataType)
}

Затем

 differences.foldLeft(df) {
      case (acc, value) => castColumn(acc, value._1, value._2)
 }
0 голосов
/ 12 октября 2018

Если я правильно понимаю ваше требование, следующий пример иллюстрирует, как вернуть DataFrame с измененными типами столбцов к его исходной версии:

import org.apache.spark.sql.types._

val df1 = Seq(
  (1, "a", 100L, 10.0), (2, "b", 200L, 20.0)
).toDF("c1", "c2", "c3", "c4")

val df2 = Seq(
  (1, "a", 100, 10.0f), (2, "b", 200, 20.0f)
).toDF("c1", "c2", "c3", "c4")

df2.printSchema
// root
//  |-- c1: integer (nullable = false)
//  |-- c2: string (nullable = true)
//  |-- c3: integer (nullable = false)
//  |-- c4: float (nullable = false)

val fieldsDiffType = (df1.schema.fields zip df2.schema.fields).collect{
  case (a: StructField, b: StructField) if a.dataType != b.dataType =>
    (a.name, a.dataType)
}
// fieldsDiffType: Array[(String, org.apache.spark.sql.types.DataType)] =
//   Array((c3,LongType), (c4,DoubleType))

val df2To1 = fieldsDiffType.foldLeft(df2)( (accDF, field) =>
  accDF.withColumn(field._1, col(field._1).cast(field._2))
)

df2To1.printSchema
// root
//  |-- c1: integer (nullable = false)
//  |-- c2: string (nullable = true)
//  |-- c3: long (nullable = false)
//  |-- c4: double (nullable = false)

Обратите внимание, что это решение работает, только если столбцы DataFrame остаются прежнимипо размеру и порядку, и не распространяется на такие типы, как массив или структура.

[ОБНОВЛЕНИЕ]

Если есть опасения, что порядок столбцов может быть изменен, вы можете сначала заказать df1.schema.fields иdf2.schema.fields перед выполнением zip:

df1.schema.fields.sortBy(_.name) zip df2.schema.fields.sortBy(_.name)
0 голосов
/ 11 октября 2018

Вы можете попробовать

> df.printSchema
root
 |-- id: string (nullable = true)
 |-- val1: double (nullable = true)
 |-- val2: double (nullable = true)

Это напечатает схему в древовидном формате. Надеюсь, это поможет.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...