Справочная информация:
В настоящее время у меня загружаются большие файлы в AWS S3, эти файлы содержат символы новой строки в некоторых столбцах, что приводит к их неправильному чтению.Однако файлы имеют очень специфический разделитель столбцов ~#~
.Чтобы удалить неправильные переводы строк, я в данный момент направляю файлы через клей aws и объединяю каждую строку со следующей строкой, если в текущей строке нет нужного количества столбцов.
Пример:
возьмем строку: "val1"~#~"va\nl\n2"~#~"val3"
, которая будет представлена следующим образом
"val1"~#~"va
l
2"~#~"val3"
, переходя строка за строкой, используя:
colnum=3
for row in f:
while not len(row.split('~#~'))==colnum:
row += next(f)
cleanrow = row.replace('\n','. ')+'\n
пример очистки будетвернуть пример в одну строку, например: Ожидаемый результат :
"val1"~#~"va. l. 2"~#~"val3"
Проблема:
В настоящее время это происходитслишком долго для потоковой передачи этих больших файлов через компьютер для их очистки, даже если компьютер находится в сети AWS.Поэтому я рассмотрел использование pyspark для этого, я пытался установить пользовательский символ новой строки, например, spark._jsc.hadoopConfiguration().set("textinputformat.record.delimiter","\"\n")
, но проблема в том, что оказывается, что мы можем иметь '"\n'
и в текстовом поле, то есть не все строкиисправляется.Я новичок в pyspark, поэтому не совсем уверен, с чего начать.Я пробовал map
, flatMap
и reduce
, но мне кажется, что это не то, что мне нужно, поскольку они либо используют только текущую строку, либо объединяют все строки в одну.Самое близкое, что я нашел на SO, это этот пост , который использует функцию sliding
, но вопрос немного отличается от того, чего я пытаюсь достичь, и я не могу найти какую-либо документацию этогоодин в писпарке, только скала.
Приветствуются другие предложения о том, как решить проблему с новой линией, используя другие инструменты, которые могут быть реализованы в AWS glue и не предполагающие потоковую передачу набора данных.(Файл слишком большой, чтобы поместиться в памяти)