Функция Spark .count () отличается от содержимого фрейма данных при фильтрации по полю поврежденной записи - PullRequest
0 голосов
/ 01 мая 2018

У меня есть задание Spark, написанное на Python, которое получает странное поведение при проверке ошибок в своих данных. Упрощенная версия ниже:

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructType, StructField, DoubleType
from pyspark.sql.functions import col, lit

spark = SparkSession.builder.master("local[3]").appName("pyspark-unittest").getOrCreate()
spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")


SCHEMA = StructType([
    StructField("headerDouble", DoubleType(), False),
    StructField("ErrorField", StringType(), False)
])

dataframe = (
    spark.read
    .option("header", "true")
    .option("mode", "PERMISSIVE")
    .option("columnNameOfCorruptRecord", "ErrorField")
    .schema(SCHEMA).csv("./x.csv")
)

total_row_count = dataframe.count()
print("total_row_count = " + str(total_row_count))

errors = dataframe.filter(col("ErrorField").isNotNull())
errors.show()
error_count = errors.count()
print("errors count = " + str(error_count))

CSV, который он читает, просто:

headerDouble
wrong

Соответствующий вывод это

total_row_count = 1
+------------+----------+
|headerDouble|ErrorField|
+------------+----------+
|        null|     wrong|
+------------+----------+

errors count = 0

Теперь, как это возможно? Если в фрейме данных есть запись, как считается 0? Это ошибка в инфраструктуре Spark или я что-то упустил?

РЕДАКТИРОВАТЬ: Похоже, это может быть известной ошибкой в ​​Spark 2.2, которая была исправлена ​​в Spark 2.3 - https://issues.apache.org/jira/browse/SPARK-21610

1 Ответ

0 голосов
/ 02 мая 2018

Спасибо @ user6910411 - похоже, ошибка. Я поднял проблему в трекере ошибок проекта Spark.

Я предполагаю, что Spark запутался из-за присутствия ErrorField в схеме, которая также указывается в качестве столбца ошибок и используется для фильтрации кадра данных.

Между тем, мне кажется, я нашел обходной путь для подсчета строк данных с разумной скоростью:

def count_df_with_spark_bug_workaround(df):
    return sum(1 for _ in df.toLocalIterator())

Не совсем уверен, почему это дает правильный ответ, когда .count() не работает.

Jira билет, который я поднял: https://issues.apache.org/jira/browse/SPARK-24147

Это оказалось дубликатом: https://issues.apache.org/jira/browse/SPARK-21610

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...