Сохранение и загрузка файлов всего текста с помощью Spark RDD - PullRequest
0 голосов
/ 26 мая 2020

Мне нужно выполнить пакетную обработку некоторых текстовых файлов в Spark. В основном кто-то дал мне тонны искаженных файлов csv. Они содержат много строк данных заголовка в произвольном текстовом формате, а затем много строк правильно отформатированных данных csv. Мне нужно разделить эти данные на два файла или хотя бы как-то избавиться от заголовка.

В любом случае, я читал, что вы можете получить RDD в формате:

[(имя файла, содержимое)]

с использованием

spark \ .sparkContext \ .wholeTextFiles (input_files_csv)

Я хотел бы для выполнения операции сопоставления в этом RDD, которая приводит к другому формату, точно так же, как исходный

[(newfilename, content)]

Затем я хотел бы сохранить кластер это содержимое под этими именами файлов.

Мне не удалось найти команду записи, которая сделает это за меня. Я могу сохранить необработанный RDD, но я не могу сохранить его как обычные файлы, которые потом я смогу прочитать как фреймы данных.

Я полагаю, я мог бы удалить заголовки, а затем сохранить как один гигантский CSV с именем файла в качестве новой колонки, но я чувствую, что это будет не так эффективно.

Есть ли у кого-нибудь решение моей проблемы?

1 Ответ

1 голос
/ 27 мая 2020

Это Scala, но не должно быть очень далеко в Python. Внутри "foreach" я не использую ничего специфичного для искры c для записи файлов, только обычные Had oop API.

sc.wholeTextFiles("/tmp/test-data/")
  .foreach{ x =>
    val filename = x._1
    val content = x._2
    val fs = FileSystem.get(new Configuration())
    val output = fs.create(new Path(s"${filename}-copy"))
    val writer = new PrintWriter(output)
    writer.write(content)
    writer.close
  }
...