Pyspark RDD объединяет текущую строку со следующей, пока длина текущей строки не достигнет x - PullRequest
0 голосов
/ 19 декабря 2018

Справочная информация:

В настоящее время у меня загружаются большие файлы в AWS S3, эти файлы содержат символы новой строки в некоторых столбцах, что приводит к их неправильному чтению.Однако файлы имеют очень специфический разделитель столбцов ~#~.Чтобы удалить неправильные переводы строк, я в данный момент направляю файлы через клей aws и объединяю каждую строку со следующей строкой, если в текущей строке нет нужного количества столбцов.

Пример:

возьмем строку: "val1"~#~"va\nl\n2"~#~"val3", которая будет представлена ​​следующим образом

"val1"~#~"va
l
2"~#~"val3"

, переходя строка за строкой, используя:

colnum=3
for row in f:
    while not len(row.split('~#~'))==colnum:
        row += next(f)
cleanrow = row.replace('\n','. ')+'\n

пример очистки будетвернуть пример в одну строку, например: Ожидаемый результат :

"val1"~#~"va. l. 2"~#~"val3"

Проблема:

В настоящее время это происходитслишком долго для потоковой передачи этих больших файлов через компьютер для их очистки, даже если компьютер находится в сети AWS.Поэтому я рассмотрел использование pyspark для этого, я пытался установить пользовательский символ новой строки, например, spark._jsc.hadoopConfiguration().set("textinputformat.record.delimiter","\"\n"), но проблема в том, что оказывается, что мы можем иметь '"\n' и в текстовом поле, то есть не все строкиисправляется.Я новичок в pyspark, поэтому не совсем уверен, с чего начать.Я пробовал map, flatMap и reduce, но мне кажется, что это не то, что мне нужно, поскольку они либо используют только текущую строку, либо объединяют все строки в одну.Самое близкое, что я нашел на SO, это этот пост , который использует функцию sliding, но вопрос немного отличается от того, чего я пытаюсь достичь, и я не могу найти какую-либо документацию этогоодин в писпарке, только скала.

Приветствуются другие предложения о том, как решить проблему с новой линией, используя другие инструменты, которые могут быть реализованы в AWS glue и не предполагающие потоковую передачу набора данных.(Файл слишком большой, чтобы поместиться в памяти)

1 Ответ

0 голосов
/ 08 января 2019

Мне удалось решить мою проблему

#first I read in the data
rdd = spark.sparkContext.textFile(MessyFile)

#the first line is expected to have the correct number of columns (no linebreaks within a column)
cols = len(rdd.first().split("~#~"))

#I store the already "correct" rows in one RDD, and the incorrect ones in a different RDD
correct = rdd.filter(lambda x: len(x.split("~#~"))==cols)
wrong = rdd.filter(lambda x: len(x.split("~#~"))!=cols)

#The incorrect rows are now so small that they will fit in memory, so I can make RDD into an iterable list
fix = iter(wrong.collect())
fixed = []

#I then iterate over all the rows in the incorrect list and add next row until the row has the expected number of columns, and I add ". " to indicate where there was a linebreak
#The new rows are added to a new list called fixed
for item in fix:
    row = item
    while len(row.split("~#~"))!=cols:
        row+='. '+next(fix)
    fixed.append(row)

#I then union the already correct rows with the newly fixed rows
new = correct.union(spark.sparkContext.parallelize(fixed)) \
        .map(lambda row: row.split("~#~"))

#I then create a dataframe, assing the first row as header and write it out as a parquet file
header = new.first()
df = new.filter(lambda line: line != header).toDF()
oldcols = df.columns

df = reduce(lambda df, idx:df.withColumnRenamed(oldcols[idx],header[idx]),range(len(oldcols)),df)

df.coalesce(10).write.parquet(CleanFile,mode='overwrite') 

Единственные проблемы, о которых я могу думать выше, - это если число неправильных строк будет больше, чем может поместиться в памяти (маловероятно), или есть строкаразрыв в первом или последнем столбце (маловероятно в моих файлах)

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...