Я использую Spark 2.2
в кластере hadoop 2.6.3
.
У меня есть сотни CSV-файлов, которые мне нужно обработать, каждый из которых содержит от 10 до 60 столбцов.
Для каждого CSV-файла схема хранится в JSON-файле, который я могу прочитать и преобразовать в spark.sql.types.StructField
объект.
Мои файлы не чистые, и почти в каждой строке есть хотя бы одно нулевое значение.
Пример. Поэтому, когда я загружаю свой CSV-файл без схемы:
df = spark.read.option("header","false").option("delimiter",del).csv(filename)
, я получаю это:
| _c01| _c02| _c03|_c04|
+----------+---------+-------+----+
| 1| 1234| test|null|
| 2| null| test| 10|
| 3| 1256| test| 9|
| 4| null| null| 8|
Поэтому, когда я загружаю свой CSV-файл со схемой
schema = createSparkSchemaFromJsonSchema(json_filename)
df = spark.read.option("header","false").option("delimiter",del).csv(filename, schema = schema)
df.printSchema
df.show()
Я везде получаю ноль:
root
|-- student_id: integer(nullable = false)
|-- school_id: integer(nullable = false)
|-- teacher: string (nullable = false)
|-- rank: integer(nullable = false)
|student_id|school_id|teacher|rank|
+----------+---------+-------+----+
| null| null| null|null|
| null| null| null|null|
| null| null| null|null|
| null| null| null|null|
Я пробовал несколько вариантов, таких как option("nullValue", "null")
, но это не сработало
Я знаю, что делает что-то вроде df.select(col("colname").cast(IntegerType))
для каждого столбца должно работать, но я хочу, чтобы мой код был общим (сотни файлов), а не обрабатывать каждый столбец.
У кого-то есть идеи по этому поводу?