У меня есть задание Spark, написанное на Python, которое получает странное поведение при проверке ошибок в своих данных. Упрощенная версия ниже:
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructType, StructField, DoubleType
from pyspark.sql.functions import col, lit
spark = SparkSession.builder.master("local[3]").appName("pyspark-unittest").getOrCreate()
spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
SCHEMA = StructType([
StructField("headerDouble", DoubleType(), False),
StructField("ErrorField", StringType(), False)
])
dataframe = (
spark.read
.option("header", "true")
.option("mode", "PERMISSIVE")
.option("columnNameOfCorruptRecord", "ErrorField")
.schema(SCHEMA).csv("./x.csv")
)
total_row_count = dataframe.count()
print("total_row_count = " + str(total_row_count))
errors = dataframe.filter(col("ErrorField").isNotNull())
errors.show()
error_count = errors.count()
print("errors count = " + str(error_count))
CSV, который он читает, просто:
headerDouble
wrong
Соответствующий вывод это
total_row_count = 1
+------------+----------+
|headerDouble|ErrorField|
+------------+----------+
| null| wrong|
+------------+----------+
errors count = 0
Теперь, как это возможно? Если в фрейме данных есть запись, как считается 0? Это ошибка в инфраструктуре Spark или я что-то упустил?
РЕДАКТИРОВАТЬ: Похоже, это может быть известной ошибкой в Spark 2.2, которая была исправлена в Spark 2.3 - https://issues.apache.org/jira/browse/SPARK-21610