Вариант 1, чтение всех файлов:
Вы можете прочитать описания с помощью wholeTextFiles
. Он возвращает RDD, отображающий пути файлов к их содержимому, а затем объединяет результат с вашим фреймом данных.
val descriptions = sc
.wholeTextFiles(".../descriptions")
// We need to extract the id from the file path
.map{ case (path, desc) => {
val fileName = path.split("/").last
val id = "[0-9]+".r.findFirstIn(fileName)
id.get.toLong -> desc
}}
.toDF("id", "description")
val result = df1.join(descriptions, Seq("id"))
Вариант 2, считывая только нужные вам файлы
Для этого вы можете использовать binaryFiles
. Он создает RDD, который сопоставляет каждый путь к файлу с DataStream
. Поэтому файлы не читаются сразу. Затем вы можете выбрать все отдельные id
из df1
, объединить их с RDD и затем читать только содержимое нужных вам файлов. Код будет выглядеть так:
val idRDD = df1
.select("id").distinct
.rdd.map(_.getAs[Long]("id") -> true)
val descriptions = sc.binaryFiles(".../descriptions")
// same as before, but the description is not read yet
.map{ case (path, descFile) => {
val fileName = path.split("/").last
val id = "[0-9]+".r.findFirstIn(fileName)
id.get.toLong -> descFile
}} // inner join with the ids we are interested in
.join(idRDD)
.map{ case(id, (file, _)) => id -> file}
// reading the files
.mapValues(file => {
val reader = scala.io.Source.fromInputStream(file.open)
val desc = reader.getLines.mkString("\n")
reader.close
desc
})
.toDF("id", "description")
val result = df1.join(descriptions, Seq("id"))