Чтение содержимого файла для каждой строки Spark DataFrame - PullRequest
0 голосов
/ 09 апреля 2020

У нас есть корзина AWS S3 с миллионами документов в сложной иерархии, и CSV-файл со ссылками (среди прочих данных) на подмножество этих файлов, по моим оценкам, этот файл будет содержать от 1000 до 10.000 строк. Мне нужно объединить данные из файла CSV с содержимым документов для дальнейшей обработки в Spark. Если это имеет значение, мы используем Scala и Spark 2.4.4 в кластере Amazon EMR 6.0.0.

Я могу придумать два способа сделать это. Во-первых, это добавить преобразование в CSV DataFrame, которое добавляет содержимое в качестве нового столбца:

val df = spark.read.format("csv").load("<csv file>")
val attempt1 = df.withColumn("raw_content", spark.sparkContext.textFile($"document_url"))

или его варианты (например, оборачивая его в udf), кажется, не работать, я думаю, потому что sparkContext.textFile возвращает RDD, так что я не уверен, что он вообще должен работать таким образом? Даже если я заставлю это работать, лучший ли способ сохранить его производительность в Spark?

Альтернатива, о которой я пытался придумать, - это использовать spark.sparkContext.wholeTextFiles заранее, а затем соединить два кадра данных:

val df = spark.read.format("csv").load("<csv file>")
val contents = spark.sparkContext.wholeTextFiles("<s3 bucket>").toDF("document_url", "raw_content");
val attempt2 = df.join(contents, df("document_url") === contents("document_url"), "left")

, но wholeTextFiles не go в подкаталоги, и необходимые пути предсказать сложно, и я также не уверен в влиянии на производительность при попытке создать RDD из всего сегмента миллионов файлов если мне понадобится лишь небольшая часть этого, поскольку API S3, вероятно, не позволяет очень быстро перечислить все объекты в корзине.

Есть идеи? Спасибо!

1 Ответ

0 голосов
/ 09 апреля 2020

В конце концов я нашел решение:

val df = spark.read.format("csv").load("<csv file>")
val allS3Links = df.map(row => row.getAs[String]("document_url")).collect()
val joined = allS3Links.mkString(",")
val contentsDF = spark.sparkContext.wholeTextFiles(joined).toDF("document_url", "raw_content");

Недостатком этого решения является то, что оно вытягивает все URL-адреса к драйверу, но в моем случае это выполнимо (100 000 * ~ 100 символов длина строки не так уж и велика) и, возможно, даже неизбежна.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...