Как очистить беспорядочный файл данных с помощью pyspark? - PullRequest
0 голосов
/ 30 мая 2020

У меня есть большой файл с неструктурированными данными, который мне нужно немного почистить, прежде чем я смогу поместить все в искровой фреймворк. Например, у меня есть файл данных, как показано ниже:

garbage line
garbage line
headers below
name
age
gender
headers above
garbage line
data below
ben,23,male
april,18,female
chris,26,male
data above

Этот файл содержит как метаданные файла, так и фактические строки данных. Если бы я собирался использовать Python, я бы получил индекс строки для headers below, headers above, data below, data above, а затем нарезал бы список строк, чтобы получить заголовки и строки данных, чтобы встроить его в pandas. Как это лучше всего сделать в Pyspark? Приведенный ниже код - это то, что я получил до сих пор, задаваясь вопросом, есть ли более простой способ сделать это.

from pyspark import SparkContext
from pyspark.sql import SparkSession

sc = SparkContext()
spark = SparkSession(sc)

rdd = sc.textFile('some_unstrutured_data.txt')

indicators = ['headers below', 'headers above', 'data below', 'data above']
row_pointers = rdd.zipWithIndex().filter(lambda r: r[0] in indicators).map(lambda r: r[1]).collect()
header_start, header_end, data_start, data_end = row_pointers[0], row_pointers[1], row_pointers[2], row_pointers[3]

headers = rdd.zipWithIndex().filter(lambda r: header_start < r[1] < header_end).map(lambda r: r[0]).collect()
data = rdd.zipWithIndex().filter(lambda r: data_start < r[1] < data_end).map(lambda r: tuple(r[0].split('|')))

df = data.toDF(headers)
...