У меня есть большой файл с неструктурированными данными, который мне нужно немного почистить, прежде чем я смогу поместить все в искровой фреймворк. Например, у меня есть файл данных, как показано ниже:
garbage line
garbage line
headers below
name
age
gender
headers above
garbage line
data below
ben,23,male
april,18,female
chris,26,male
data above
Этот файл содержит как метаданные файла, так и фактические строки данных. Если бы я собирался использовать Python, я бы получил индекс строки для headers below
, headers above
, data below
, data above
, а затем нарезал бы список строк, чтобы получить заголовки и строки данных, чтобы встроить его в pandas. Как это лучше всего сделать в Pyspark? Приведенный ниже код - это то, что я получил до сих пор, задаваясь вопросом, есть ли более простой способ сделать это.
from pyspark import SparkContext
from pyspark.sql import SparkSession
sc = SparkContext()
spark = SparkSession(sc)
rdd = sc.textFile('some_unstrutured_data.txt')
indicators = ['headers below', 'headers above', 'data below', 'data above']
row_pointers = rdd.zipWithIndex().filter(lambda r: r[0] in indicators).map(lambda r: r[1]).collect()
header_start, header_end, data_start, data_end = row_pointers[0], row_pointers[1], row_pointers[2], row_pointers[3]
headers = rdd.zipWithIndex().filter(lambda r: header_start < r[1] < header_end).map(lambda r: r[0]).collect()
data = rdd.zipWithIndex().filter(lambda r: data_start < r[1] < data_end).map(lambda r: tuple(r[0].split('|')))
df = data.toDF(headers)