Я хочу прочитать RDD[String]
с помощью устройства чтения CSV. Причина, по которой я это делаю, заключается в том, что мне нужно отфильтровать некоторые записи перед использованием CSV-ридера.
val fileRDD: RDD[String] = spark.sparkContext.textFile("file")
Мне нужно прочитать файл RDD с помощью устройства чтения CSV. Я не хочу фиксировать файл, поскольку он увеличивает IO HDFS. Я изучил варианты, которые есть у нас в CSV, но не нашел.
spark.read.csv(file)
Пример данных
PHM|MERC|PHARMA|BLUEDRUG|50
CLM|BSH|CLAIM|VISIT|HSA|EMPLOYER|PAID|250
PHM|GSK|PHARMA|PARAC|70
CLM|UHC|CLAIM|VISIT|HSA|PERSONAL|PAID|72
Как видите, все записи начинаются с того, что PHM имеет разное количество столбцов, а clm имеет разное количество столбцов. Вот почему я фильтрую, а затем применяю схему. Записи PHM и CLM имеют разные схемы.
val fileRDD: RDD[String] = spark.sparkContext.textFile("file").filter(_.startWith("PHM"))
spark.read.option(schema,"phcschema").csv(fileRDD.toDS())