Я работаю над простым проектом ETL, который читает файлы CSV, выполняет
некоторые изменения в каждом столбце, а затем записывает результат в виде JSON.
Я хотел бы, чтобы последующие процессы читали мои результаты.
быть уверенным, что мой результат соответствует
согласованная схема, но моя проблема в том, что даже если я определю
моя схема ввода с nullable = false для всех полей, нули могут красться
в и повредить мои выходные файлы, и, кажется, нет никакого (производительного) способа, которым я могу
заставьте Spark применять 'not null' для моих полей ввода.
Похоже, что это особенность, как указано ниже в Spark, The Definition Guide:
при определении схемы, в которой объявлено, что все столбцы не имеют
нулевые значения, Spark не будет применять это и с радостью выпустит нулевое значение
значения в этот столбец. Обнуляемый сигнал просто помочь Spark
SQL оптимизировать для обработки этого столбца. Если у вас есть нулевые значения в
столбцы, которые не должны иметь нулевые значения, вы можете получить неверные
приведите или увидите странные исключения, которые трудно отладить.
Я написал небольшую утилиту для проверки каждой строки информационного кадра и
вызвать ошибку, если в любом из столбцов обнаружены пустые значения (на любом уровне
вложение в случае полей или подполей, таких как карта, структура или массив.)
Мне интересно, в частности: Я ИЗОБРЕТАЛ КОЛЕСО С ЭТОЙ ПРОВЕРКОЙ? Существуют ли какие-либо библиотеки, или
Спарк методы, которые сделали бы это для меня (в идеале, лучше, чем я реализовал)?
Ниже показана утилита проверки и упрощенная версия моего конвейера. Как представлено, призыв к
Проверка утилиты закомментирована. Если вы запускаете без включенной утилиты проверки, вы увидите этот результат в
/tmp/output.csv.
cat /tmp/output.json/*
(one + 1),(two + 1)
3,4
"",5
Вторая строка после заголовка должна быть числом, но это пустая строка
(это то, как спарк пишет ноль, я думаю.) Этот вывод будет проблематичным для
нижестоящие компоненты, которые читают вывод моей работы ETL: эти компоненты просто хотят целые числа.
Теперь я могу включить проверку, не комментируя строку
//checkNulls(inDf)
Когда я делаю это, я получаю исключение, которое информирует меня о недопустимом нулевом значении и печатает
из всей строки оскорбления, как это:
java.lang.RuntimeException: found null column value in row: [null,4]
Один из возможных альтернативных подходов, приведенный в Spark / Definitive Guide *
Spark, в «Полном руководстве» упоминается возможность сделать это:
<dataframe>.na.drop()
Но это (AFAIK) будет молча отбрасывать плохие записи, а не отмечать плохие.
Затем я мог бы сделать «set вычитание» на входе до и после удаления, но это похоже на
сильный удар по производительности, чтобы узнать, что является нулевым, а что нет. На первый взгляд, я бы
предпочитаю мой метод .... Но я все еще задаюсь вопросом, может ли быть какой-то лучший выход там.
Полный код приведен ниже. Спасибо!
package org
import java.io.PrintWriter
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.sql.types._
// before running, do; rm -rf /tmp/out* /tmp/foo*
object SchemaCheckFailsToExcludeInvalidNullValue extends App {
import NullCheckMethods._
//val input = "2,3\n\"xxx\",4" // this will be dropped as malformed
val input = "2,3\n,4" // BUT.. this will be let through
new PrintWriter("/tmp/foo.csv") { write(input); close }
lazy val sparkConf = new SparkConf()
.setAppName("Learn Spark")
.setMaster("local[*]")
lazy val sparkSession = SparkSession
.builder()
.config(sparkConf)
.getOrCreate()
val spark = sparkSession
val schema = new StructType(
Array(
StructField("one", IntegerType, nullable = false),
StructField("two", IntegerType, nullable = false)
)
)
val inDf: DataFrame =
spark.
read.
option("header", "false").
option("mode", "dropMalformed").
schema(schema).
csv("/tmp/foo.csv")
//checkNulls(inDf)
val plusOneDf = inDf.selectExpr("one+1", "two+1")
plusOneDf.show()
plusOneDf.
write.
option("header", "true").
csv("/tmp/output.csv")
}
object NullCheckMethods extends Serializable {
def checkNull(columnValue: Any): Unit = {
if (columnValue == null)
throw new RuntimeException("got null")
columnValue match {
case item: Seq[_] =>
item.foreach(checkNull)
case item: Map[_, _] =>
item.values.foreach(checkNull)
case item: Row =>
item.toSeq.foreach {
checkNull
}
case default =>
println(
s"bad object [ $default ] of type: ${default.getClass.getName}")
}
}
def checkNulls(row: Row): Unit = {
try {
row.toSeq.foreach {
checkNull
}
} catch {
case err: Throwable =>
throw new RuntimeException(
s"found null column value in row: ${row}")
}
}
def checkNulls(df: DataFrame): Unit = {
df.foreach { row => checkNulls(row) }
}
}