Написание паркетного файла, преобразованного в формат CSV, занимает более 4 часов - PullRequest
1 голос
/ 31 марта 2020

Недавно я начал новую работу, в которой мне приходится манипулировать данными с очень большим набором данных (207 миллиардов строк). Основа c предпосылки кода искры должна:

Импортировать все файлы паркета из каталога Azure Data Lake. Мы назовем этот файл «Большой файл».

Загрузите в CSV-файл с именем «Продукты», который содержит список продуктов, и отфильтруйте «Большой файл», где продукты существуют в обоих наборах. Пример: если в базе данных продуктов существует «база данных», сохраните запись в большом файле, в противном случае удалите запись из «большого файла».

Загрузите в файл CSV для сопоставления регионов, который добавляет дополнительные географические данные в «». Большой файл ". Пример: если в столбце страны «Большой файл» указана Япония в качестве страны, то в сопоставлении регионов csv добавляет APJ в качестве региона и JP в качестве кода страны.

После того, как вышеуказанные фильтры выполнены, я должен отфильтровать конкретный регион, такой как APJ, а затем отбрасывает дубликаты на основе всех столбцов.

Последним шагом будет экспорт этого фрейма данных в Azure Озеро данных в виде файла CSV. В идеале я хотел бы, чтобы этот вывод был 1 файлом вместо нескольких файлов CSV.

Когда я запускаю это задание, его выполнение занимает не менее 4 часов.

Я пытался сделать свое Проведите собственное исследование и попытайтесь сделать следующее, чтобы ускорить работу:

  1. Передача CSV меньшего региона для сопоставления с «Большим файлом»

  2. Подразумевает схему при чтении в «Большом файле».

  3. Сохранение и кэширование «Большого файла» после его чтения.

  4. Перепишите, используя искру sql, но производительность не изменилась.

  5. Попытка с использованием объединения,

df.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save("path")
Пробовал с помощью перераспределения и разбиения по
DF.repartition(100, $"country").write.option("header", "true").partitionBy("country").mode("overwrite").csv("path")

Вот некоторые фрагменты кода, если это поможет:

import org.apache.spark.sql.types.StructType
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter

val six_months_prior = DateTimeFormatter.ofPattern("yyyy-MM-dd").format(LocalDateTime.now.minusMonths(6))

val Big_File = spark.read.format("parquet").

load("Big_File_Path").

selectExpr("company_name as Company_Name",

"domain as URL",

"company_size as Company_Size",

"industry as Industry",

"domain_origin as Country",

"topic as Topic",

"record_date as Record_Date").filter(!($"Company_Name"==="") && ($"Record_Date" >= six_months_prior))

val Topics = spark.read.format("com.databricks.spark.csv").option("header", "true").load("azure path"). rdd.map(r => r(0)).collect().toList

val Bombora_Intent = Bombora_Intent.filter(($"Topic".isin(Topics:_*)))

val Region_Mapping = spark.read.format("com.databricks.spark.csv").option("header", true).load("Region_Path")

val df = Bombora_Intent.join(broadcast(Region_Mapping), Seq("Country")).select("Company_Name", "URL", "Company_Size", "Industry","Record_Date", "Region", "Country", "Country_Code")

val new_df = Bombora_Intent_SAP_Topics.filter($"Region" === "APJ").distinct()

new_df.write.option("header", "true").mode("overwrite").csv("azure_path")```

физический план:

Click here an img of the physical plan:

...