При загрузке CSV-файла, определяющего схему, где некоторые поля помечены nullable = false
, я ожидаю, что строки, содержащие нулевые значения для указанных столбцов, будут удалены или отфильтрованы из набора данных при определении режима * 1002.*.С моей стороны это может быть неправильным пониманием того, что именно считается искаженным, но в любом случае меня смущает то, как код продолжает работать, когда схема явно определяет определенные поля как не принимающие нулевые значения.
Глядя на документы по базам данных для программы чтения CSV (я знаю, что эта функциональность теперь напрямую добавлена в Apache Spark, но я не могу найти документацию по ней), это означает, что схема должна учитываться при чтении значений.
https://github.com/databricks/spark-csv#features
DROPMALFORMED: отбрасывает строки, которые имеют меньше или больше токенов, чем ожидалось , или токены, которые не соответствуют схеме
Пример:
CSV-файл:
value1,value2,,value4
Spark-код (в Scala):
val spark = SparkSession
.builder()
.appName("example")
.master("local")
.getOrCreate()
spark.read.
.format("csv")
.schema(StructType(
StructField("col1", StringType),
StructField("col2", StringType),
StructField("col3", StringType, nullable = false),
StructField("col4", StringType)))
.option("header", false)
.option("mode", "DROPMALFORMED")
.load("path/to/file.csv")
Приведенный выше код по-прежнему будет содержать строку, содержащую нользначение для col3
, и на самом деле, если я хочу отфильтровать записи с нулевым значением, я должен сделать следующее:
.filter(row => !row.isNullAt(row.fieldIndex("col3")))
Итак, мои вопросы:
1) Была моя какПредположение, что режим DROPMALFORMED отбрасывает данные, не соответствующие схеме, недопустимыми?
2) Я сделал что-то не так в том, как я загружаю CSV, что привело к неожиданному поведению, описанному выше (или, возможно, есть лучший очистительспособ делать то, что я хочу)?
Мой код выглядит практически идентично примерам документации по кирпичам данных и другим примерам загрузки CSV-файлов с использованием Spark, найденным в Интернете.
Я использую Spark 2.3.1
с Scala 2.11.8
.
[править]
Я поднял этот вопрос на JIRA Apache Spark: https://issues.apache.org/jira/browse/SPARK-25545