Почему фрейм данных не генерирует исключение RunTimeException с опцией "FAILFAST" в спарк? - PullRequest
0 голосов
/ 02 июня 2019

У меня есть следующий входной файл, в котором могут быть плохие записи, я хочу создать исключение и определить столбцы с именами столбцов, которые не соответствуют моей пользовательской схеме. Согласно моему пониманию, фрейм данных должен немедленно вызвать исключение, даже если мы не вызываем никаких действий с ним.

1, а, 10000,11-03-2019, Пуна

2, б, 10020,14-03-2019, Пуна

3, а, 34567,15-03-2019, Пуна

tyui, а, FGH-03-2019, Пуна

4, б, 10020,14-03-2019, Пуна

Я попытался установить опцию "FAILFAST" для фрейма данных spark, но он не вызывает каких-либо исключений на моем конце.

Я попробовал приведенный ниже код.

SparkSession ss = SparkSession.builder().appName("Data Quality Frameowrk")
    .master("local")
    .getOrCreate();

    try {
    StructField[] fields = new StructField[5];
    fields[0] = new StructField("id", DataTypes.IntegerType, false,Metadata.empty());
    fields[1] = new StructField("name", DataTypes.StringType, false,Metadata.empty());
    fields[2] = new StructField("salary", DataTypes.DoubleType, false,Metadata.empty());
    fields[3] = new StructField("dob", DataTypes.DateType, false,Metadata.empty());
    fields[4] = new StructField("loc", DataTypes.StringType, false,Metadata.empty());
    StructType customSchema = new StructType(fields);

    ss.read().format("csv")
            .schema(customSchema)
            .option("mode", "FAILFAST")
            .load("C:\\\\Users\\\\manoj.dhake\\\\Downloads\\\\softwares\\\\neo4jdata\\\\employee.csv");


    }catch(Exception e) {
        System.out.println("want to catch column name ,due to which error has been occured");
        e.printStackTrace();
    }

Примечание. Программа должна иметь возможность захватывать имя столбца в случае несоответствия типов данных и продолжать выполнение потока выполнения (не должно быть ненормально завершено).

1 Ответ

1 голос
/ 02 июня 2019

Это потому, что Spark ленив, он даже не читает данные при вызове load, и только обработка фрейма данных вызовет фактическое чтение. Согласно документации

FAILFAST: выдает исключение при обнаружении поврежденных записей.

Так что это не имеет ничего общего с тем, чтобы сделать нагрузку нетерпеливой. Проверка может быть выполнена с нетерпением, запустив обработку вручную, но это приведет к тому, что все данные будут обработаны дважды, если все записи верны. Влияние на производительность может быть несколько смягчено с помощью cache:

val df = spark.read
  .schema(StructType(Seq(StructField("test", IntegerType))))
  .option("mode", "FAILFAST")
  .csv(Seq("a").toDS())
  .cache()
df.count()

бросит

aorg.apache.spark.SparkException: Malformed records are detected in record parsing. Parse Mode: FAILFAST.
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...