В каталог bad_record можно помещать записи, длина которых не равна длине заголовка. - PullRequest
0 голосов
/ 27 августа 2018

Я читаю файл в кадре данных, как это

val df = spark.read
   .option("sep", props.inputSeperator)
   .option("header", "true")
   .option("badRecordsPath", "/mnt/adls/udf_databricks/error")
   .csv(inputLoc)

Файл настроен так

col_a|col_b|col_c|col_d
1|first|last|
2|this|is|data
3|ok
4|more||stuff
5|||

Теперь spark будет читать все это как приемлемые данные. Однако я хочу, чтобы 3|ok был отмечен как плохая запись, потому что ее размер не соответствует размеру заголовка. Возможно ли это?

Ответы [ 2 ]

0 голосов
/ 27 августа 2018

Приведенный ниже код поддерживается реализацией блоков данных в spark. Я не вижу отображения схемы в вашем коде. Можете ли вы сопоставить это и попробовать?

.option("badRecordsPath", "/mnt/adls/udf_databricks/error")

Измените свой код, как показано ниже,

val customSchema = StructType(Array(
    StructField("col_a", StringType, true),
    StructField("col_b", StringType, true),
    StructField("col_c", StringType, true),
    StructField("col_d", StringType, true)))

val df = spark.read
   .option("sep", props.inputSeperator)
   .option("header", "true")
   .option("badRecordsPath", "/mnt/adls/udf_databricks/error")
   .schema(customSchema)
   .csv(inputLoc)

Более подробную информацию вы можете получить Документ Datbricks на badrecordspath

Спасибо, Karthick

0 голосов
/ 27 августа 2018
val a = spark.sparkContext.textFile(pathOfYourFile)
val size = a.first.split("\\|").length
a.filter(i => i.split("\\|",-1).size != size).saveAsTextFile("/mnt/adls/udf_databricks/error")
...