Обнуляемость в схемах Spark sql является рекомендательной по умолчанию.Каков наилучший способ строго соблюдать его? - PullRequest
1 голос
/ 14 мая 2019

Я работаю над простым проектом ETL, который читает файлы CSV, выполняет некоторые изменения в каждом столбце, а затем записывает результат в виде JSON. Я хотел бы, чтобы последующие процессы читали мои результаты. быть уверенным, что мой результат соответствует согласованная схема, но моя проблема в том, что даже если я определю моя схема ввода с nullable = false для всех полей, нули могут красться в и повредить мои выходные файлы, и, кажется, нет никакого (производительного) способа, которым я могу заставьте Spark применять 'not null' для моих полей ввода.

Похоже, что это особенность, как указано ниже в Spark, The Definition Guide:

при определении схемы, в которой объявлено, что все столбцы не имеют нулевые значения, Spark не будет применять это и с радостью выпустит нулевое значение значения в этот столбец. Обнуляемый сигнал просто помочь Spark SQL оптимизировать для обработки этого столбца. Если у вас есть нулевые значения в столбцы, которые не должны иметь нулевые значения, вы можете получить неверные приведите или увидите странные исключения, которые трудно отладить.

Я написал небольшую утилиту для проверки каждой строки информационного кадра и вызвать ошибку, если в любом из столбцов обнаружены пустые значения (на любом уровне вложение в случае полей или подполей, таких как карта, структура или массив.)

Мне интересно, в частности: Я ИЗОБРЕТАЛ КОЛЕСО С ЭТОЙ ПРОВЕРКОЙ? Существуют ли какие-либо библиотеки, или Спарк методы, которые сделали бы это для меня (в идеале, лучше, чем я реализовал)?

Ниже показана утилита проверки и упрощенная версия моего конвейера. Как представлено, призыв к Проверка утилиты закомментирована. Если вы запускаете без включенной утилиты проверки, вы увидите этот результат в /tmp/output.csv.

cat /tmp/output.json/*
(one + 1),(two + 1)
3,4
"",5

Вторая строка после заголовка должна быть числом, но это пустая строка (это то, как спарк пишет ноль, я думаю.) Этот вывод будет проблематичным для нижестоящие компоненты, которые читают вывод моей работы ETL: эти компоненты просто хотят целые числа.

Теперь я могу включить проверку, не комментируя строку

   //checkNulls(inDf)

Когда я делаю это, я получаю исключение, которое информирует меня о недопустимом нулевом значении и печатает из всей строки оскорбления, как это:

        java.lang.RuntimeException: found null column value in row: [null,4]

Один из возможных альтернативных подходов, приведенный в Spark / Definitive Guide *

Spark, в «Полном руководстве» упоминается возможность сделать это:

<dataframe>.na.drop() 

Но это (AFAIK) будет молча отбрасывать плохие записи, а не отмечать плохие. Затем я мог бы сделать «set вычитание» на входе до и после удаления, но это похоже на сильный удар по производительности, чтобы узнать, что является нулевым, а что нет. На первый взгляд, я бы предпочитаю мой метод .... Но я все еще задаюсь вопросом, может ли быть какой-то лучший выход там. Полный код приведен ниже. Спасибо!

package org

import java.io.PrintWriter
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.sql.types._

// before running, do; rm -rf /tmp/out* /tmp/foo*
object SchemaCheckFailsToExcludeInvalidNullValue extends App {

  import NullCheckMethods._

  //val input = "2,3\n\"xxx\",4"          // this will be dropped as malformed
  val input = "2,3\n,4"                   // BUT.. this will be let through

  new PrintWriter("/tmp/foo.csv") { write(input); close }

  lazy val sparkConf = new SparkConf()
    .setAppName("Learn Spark")
    .setMaster("local[*]")
  lazy val sparkSession = SparkSession
    .builder()
    .config(sparkConf)
    .getOrCreate()
  val spark = sparkSession

  val schema = new StructType(
    Array(
      StructField("one", IntegerType, nullable = false),
      StructField("two", IntegerType, nullable = false)
    )
  )

  val inDf: DataFrame =
    spark.
      read.
      option("header", "false").
      option("mode", "dropMalformed").
      schema(schema).
      csv("/tmp/foo.csv")

  //checkNulls(inDf)

  val plusOneDf = inDf.selectExpr("one+1", "two+1")
  plusOneDf.show()

  plusOneDf.
    write.
    option("header", "true").
    csv("/tmp/output.csv")

}

object NullCheckMethods extends Serializable {

  def checkNull(columnValue: Any): Unit = {
    if (columnValue == null)
      throw new RuntimeException("got null")
    columnValue match {
      case item: Seq[_] =>
        item.foreach(checkNull)
      case item: Map[_, _] =>
        item.values.foreach(checkNull)
      case item: Row =>
        item.toSeq.foreach {
          checkNull
        }
      case default =>
        println(
          s"bad object [ $default ] of type: ${default.getClass.getName}")
    }
  }

  def checkNulls(row: Row): Unit = {
    try {
      row.toSeq.foreach {
        checkNull
      }
    } catch {
      case err: Throwable =>
        throw new RuntimeException(
          s"found null column value in row: ${row}")
    }
  }


  def checkNulls(df: DataFrame): Unit = {
    df.foreach { row => checkNulls(row) }
  }
}

1 Ответ

1 голос
/ 14 мая 2019

Вы можете использовать встроенный метод Row anyNull , чтобы разделить фрейм данных и обработать оба разделения по-разному:

val plusOneNoNulls = plusOneDf.filter(!_.anyNull)
val plusOneWithNulls = plusOneDf.filter(_.anyNull)

Если вы не планируете осуществлять ручной процесс обработки нулей, использование встроенных методов DataFrame.na проще, поскольку в нем уже реализованы все обычные способы автоматической обработки нулей (например, отбрасывание или заполнение). их со значениями по умолчанию).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...