Недавно я начал новую работу, в которой мне приходится манипулировать данными с очень большим набором данных (207 миллиардов строк). Основа c предпосылки кода искры должна:
Импортировать все файлы паркета из каталога Azure Data Lake. Мы назовем этот файл «Большой файл».
Загрузите в CSV-файл с именем «Продукты», который содержит список продуктов, и отфильтруйте «Большой файл», где продукты существуют в обоих наборах. Пример: если в базе данных продуктов существует «база данных», сохраните запись в большом файле, в противном случае удалите запись из «большого файла».
Загрузите в файл CSV для сопоставления регионов, который добавляет дополнительные географические данные в «». Большой файл ". Пример: если в столбце страны «Большой файл» указана Япония в качестве страны, то в сопоставлении регионов csv добавляет APJ в качестве региона и JP в качестве кода страны.
После того, как вышеуказанные фильтры выполнены, я должен отфильтровать конкретный регион, такой как APJ, а затем отбрасывает дубликаты на основе всех столбцов.
Последним шагом будет экспорт этого фрейма данных в Azure Озеро данных в виде файла CSV. В идеале я хотел бы, чтобы этот вывод был 1 файлом вместо нескольких файлов CSV.
Когда я запускаю это задание, его выполнение занимает не менее 4 часов.
Я пытался сделать свое Проведите собственное исследование и попытайтесь сделать следующее, чтобы ускорить работу:
Передача CSV меньшего региона для сопоставления с «Большим файлом»
Подразумевает схему при чтении в «Большом файле».
Сохранение и кэширование «Большого файла» после его чтения.
Перепишите, используя искру sql, но производительность не изменилась.
Попытка с использованием объединения,
df.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save("path")
Пробовал с помощью перераспределения и разбиения по
DF.repartition(100, $"country").write.option("header", "true").partitionBy("country").mode("overwrite").csv("path")
Вот некоторые фрагменты кода, если это поможет:
import org.apache.spark.sql.types.StructType
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
val six_months_prior = DateTimeFormatter.ofPattern("yyyy-MM-dd").format(LocalDateTime.now.minusMonths(6))
val Big_File = spark.read.format("parquet").
load("Big_File_Path").
selectExpr("company_name as Company_Name",
"domain as URL",
"company_size as Company_Size",
"industry as Industry",
"domain_origin as Country",
"topic as Topic",
"record_date as Record_Date").filter(!($"Company_Name"==="") && ($"Record_Date" >= six_months_prior))
val Topics = spark.read.format("com.databricks.spark.csv").option("header", "true").load("azure path"). rdd.map(r => r(0)).collect().toList
val Bombora_Intent = Bombora_Intent.filter(($"Topic".isin(Topics:_*)))
val Region_Mapping = spark.read.format("com.databricks.spark.csv").option("header", true).load("Region_Path")
val df = Bombora_Intent.join(broadcast(Region_Mapping), Seq("Country")).select("Company_Name", "URL", "Company_Size", "Industry","Record_Date", "Region", "Country", "Country_Code")
val new_df = Bombora_Intent_SAP_Topics.filter($"Region" === "APJ").distinct()
new_df.write.option("header", "true").mode("overwrite").csv("azure_path")```
физический план: