У меня есть папка с более чем 100000 "csv" файлами. К сожалению, эти CSV-файлы еще не в правильном формате, поэтому я должен удалить около 80 строк заголовка.
Апли Схема, затем
("mode", "DROPMALFORMED")
И добавьте 4 новых столбца, с
df = df.withColumn ("blabla", lit (blabla))
Информация о новых столбцах я рисую из названия файла.
Все идет нормально.
Но в списке ссылок, которые я впервые создал из папки:
HDFS: // сервер :: 42 / проекты / банан / живой / csv1id4.csv
HDFS: // сервер :: 42 / проекты / банан / живой / csv2id5.csv
hdfs: // сервер :: 42 / projects / banana / live / csv3id6.csv
...
HDFS: // сервер :: 42 / проекты / банан / живой / csv100000id8484848.csv
Я хочу вызвать функцию для каждой ссылки, которая:
читает в файле csv как фрейм данных
создает 4 новых столбца и перетаскивает содержимое из файла. Имя
и записывает созданный dataFrame в файл паркета
(df.write.mode('append').parquet('hdfs:///projects/parquet
)
Очевидно, что все это не работает параллельно для "для каждого цикла 'ссылки' do"
Также пытаются сохранить список ссылок на rdd и сделать что-то вроде:
rdd.map(Lambda x: doFunction(x))
У кого-нибудь есть идея или совет?
Большое спасибо заранее!