Вы можете напрямую использовать фрейм данных Spark с использованием различных режимов анализа:
mode : по умолчанию это РАЗРЕШИТЕЛЬНЫЙ.
Возможные значения:
PERMISSIVE : попытаться проанализировать все строки: для отсутствующих токенов вставлены пустые значения, а дополнительные токены игнорируются.
DROPMALFORMED : отбросить строки, которые содержат меньше или больше токенов, чем ожидалось, или токены, которые не соответствуют схеме.
FAILFAST : прервать с RuntimeException, если обнаружена какая-либо некорректная строка.
Файл -
debo bangalore 3 4 5 6 7 8 9 Yes
debo banaglore 4 5 6 7 8 9 10 Yes
abhi Delhi 6 7 9 10 99 99 00 No
alex Newyork
Код -
import org.apache.spark.sql.types.{StructType,StringType,StructField}
val fileHeader = "name address client_add result_code bytes req_method url user hierarchy_code type"
val columnSize = fileHeader.split(" ").size
val schema= StructType(fileHeader.split(" ").map(field=>StructField(field,StringType,true)))
val csvFilePath = "tmp/data/data_with_empty_lines.txt"
val non_empty_df = spark.read.option("header", "false")
.option("delimiter", " ")
.option("inferSchema", "true")
.schema(schema)
.option("mode", "DROPMALFORMED")
.csv(csvFilePath)
non_empty_df.show()
Результат -
+----+---------+----------+-----------+-----+----------+---+----+--------------+----+
|name| address|client_add|result_code|bytes|req_method|url|user|hierarchy_code|type|
+----+---------+----------+-----------+-----+----------+---+----+--------------+----+
|debo|bangalore| 3| 4| 5| 6| 7| 8| 9| Yes|
|debo|banaglore| 4| 5| 6| 7| 8| 9| 10| Yes|
|abhi| Delhi| 6| 7| 9| 10| 99| 99| 00| No|
+----+---------+----------+-----------+-----+----------+---+----+--------------+----+
Для получения дополнительной информации о чтении файлов CSV - https://docs.databricks.com/data/data-sources/read-csv.html