Как я могу эффективно объединить фрейм данных в спарке с каталогом небольших файлов? - PullRequest
0 голосов
/ 07 февраля 2020

У меня есть фрейм данных (df1) со столбцами id, date, type. Мне нужно присоединиться к нему с файлом /descriptions/{id}.txt. Результатом объединения должен быть фрейм данных id, date, type, description для всех записей в (df1).

Цель состоит в том, чтобы использовать этот предварительно обработанный фрейм данных для дальнейшего анализа, чтобы мне не приходилось иметь дело с небольшими файлов больше.

Стоит отметить: существует намного больше небольших файлов описания, чем мне нужно (x1000), поэтому я думаю, что какое-то «ленивое соединение» будет более эффективным, чем чтение всех маленьких файлов заранее и соединение затем.

Как бы вы построили это в Spark? Я использую scala в настоящее время, но если у вас есть пример python, я думаю, что я мог бы работать и с этим.

1 Ответ

0 голосов
/ 07 февраля 2020

Вариант 1, чтение всех файлов:

Вы можете прочитать описания с помощью wholeTextFiles. Он возвращает RDD, отображающий пути файлов к их содержимому, а затем объединяет результат с вашим фреймом данных.

val descriptions = sc
    .wholeTextFiles(".../descriptions")
    // We need to extract the id from the file path
    .map{ case (path, desc) => {
        val fileName = path.split("/").last
        val id = "[0-9]+".r.findFirstIn(fileName)
        id.get.toLong -> desc
    }}
    .toDF("id", "description")

val result = df1.join(descriptions, Seq("id"))

Вариант 2, считывая только нужные вам файлы

Для этого вы можете использовать binaryFiles. Он создает RDD, который сопоставляет каждый путь к файлу с DataStream. Поэтому файлы не читаются сразу. Затем вы можете выбрать все отдельные id из df1, объединить их с RDD и затем читать только содержимое нужных вам файлов. Код будет выглядеть так:

val idRDD = df1
    .select("id").distinct
    .rdd.map(_.getAs[Long]("id") -> true)

val descriptions = sc.binaryFiles(".../descriptions")
    // same as before, but the description is not read yet
    .map{ case (path, descFile) => {
        val fileName = path.split("/").last
        val id = "[0-9]+".r.findFirstIn(fileName)
        id.get.toLong -> descFile
    }} // inner join with the ids we are interested in
    .join(idRDD)
    .map{ case(id, (file, _)) => id -> file}
    // reading the files
    .mapValues(file => {
         val reader = scala.io.Source.fromInputStream(file.open)
         val desc = reader.getLines.mkString("\n")
         reader.close
         desc
    })
    .toDF("id", "description")

val result = df1.join(descriptions, Seq("id"))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...