Ваши данные не в формате CSV.CSV означает разделенный запятыми текстовый файл с фиксированной схемой.CSV для ваших данных будет выглядеть следующим образом:
abc,x1,x2,x3,,
def,x1,x3,x4,x8,x9
ghi,x7,x10,x11,,
Обратите внимание на запятые в строках 1 и 3, которых нет в ваших данных.
Поскольку у вас есть текстовый файл, который не являетсяCSV, способ добраться до схемы, которую вы хотите в Spark, - это прочитать весь файл в Python, проанализировать то, что вы хотите, и затем использовать spark.crateDataFrame()
.В качестве альтернативы, если у вас есть несколько таких файлов в каталоге, используйте SparkContext.wholeTextFiles
, а затем flatMap
вашу функцию синтаксического анализа.
Если вы уже сделали что-то вроде open("Your File.txt").readlines
, остальное просто:
import re
from pyspark.sql import *
lines = [
"abc, x1, x2, x3",
"def, x1, x3, x4,x8,x9",
"ghi, x7, x10, x11"
]
split = re.compile("\s*,\s*")
Line = Row("id", "first", "rest")
def parse_line(id, line):
tokens = split.split(line.strip)
return Line(id, tokens[0], tokens.pop(0))
def parse_lines(lines):
return [parse_line(i, x) for i,x in enumerate(lines)]
spark.createDataFrame(parse_lines(lines))