У нас есть корзина AWS S3 с миллионами документов в сложной иерархии, и CSV-файл со ссылками (среди прочих данных) на подмножество этих файлов, по моим оценкам, этот файл будет содержать от 1000 до 10.000 строк. Мне нужно объединить данные из файла CSV с содержимым документов для дальнейшей обработки в Spark. Если это имеет значение, мы используем Scala и Spark 2.4.4 в кластере Amazon EMR 6.0.0.
Я могу придумать два способа сделать это. Во-первых, это добавить преобразование в CSV DataFrame
, которое добавляет содержимое в качестве нового столбца:
val df = spark.read.format("csv").load("<csv file>")
val attempt1 = df.withColumn("raw_content", spark.sparkContext.textFile($"document_url"))
или его варианты (например, оборачивая его в udf
), кажется, не работать, я думаю, потому что sparkContext.textFile
возвращает RDD, так что я не уверен, что он вообще должен работать таким образом? Даже если я заставлю это работать, лучший ли способ сохранить его производительность в Spark?
Альтернатива, о которой я пытался придумать, - это использовать spark.sparkContext.wholeTextFiles
заранее, а затем соединить два кадра данных:
val df = spark.read.format("csv").load("<csv file>")
val contents = spark.sparkContext.wholeTextFiles("<s3 bucket>").toDF("document_url", "raw_content");
val attempt2 = df.join(contents, df("document_url") === contents("document_url"), "left")
, но wholeTextFiles
не go в подкаталоги, и необходимые пути предсказать сложно, и я также не уверен в влиянии на производительность при попытке создать RDD из всего сегмента миллионов файлов если мне понадобится лишь небольшая часть этого, поскольку API S3, вероятно, не позволяет очень быстро перечислить все объекты в корзине.
Есть идеи? Спасибо!