Spark SQL - задержка между заданиями (Broadcast DataFrame) - PullRequest
0 голосов
/ 31 октября 2019

У меня есть приложение, которое обрабатывает 8 кадров данных в параллельной итерации. Работа выполнялась нормально, пока я не включил объединение этих фреймов данных с двумя действительно небольшими фреймами (менее 1 КБ), которые генерируются при чтении CSV-файлов. После включения объединения время выполнения приложения значительно увеличилось (более чем на 100%). Глядя на веб-интерфейс spark, я обнаружил, что некоторые работы со следующим описанием выполняются в ThreadPoolExecutor.java:1149. Эти задания отвечают за трансляцию очень маленьких информационных фреймов. Эти задания выполняются для каждого небольшого информационного кадра для каждого параллельного выполнения (16 раз). А задержка для каждого фрагмента выполнения ThreadPoolExecutor.java:1149 составляет около 4 минут. Проблема увеличивается, в то же время я добавил новый небольшой фрейм данных, который соединяется с 8 параллельными фреймами данных.

Для распараллеливания фреймов данных я создал список [DataFrame] .par

-Здесьобъект для чтения и создания кадров данных csv:

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types._

// s"${dataBaseParameters.viewConfigurationDir}view_classification_by_path_clean.csv
 //view_classification_by_subtype.csv"
 //view_classification_by_type.csv"

object ClassificationOverwriteLoader {

 def loadViewClassification (spark: SparkSession, schema: StructType, filePath: String): DataFrame = {
   spark.read
     .format("csv")
     .option("header", "true")
     .option("delimiter", ";")
     .schema(schema)
     .load(filePath)
 }

 val ByPathSchema = StructType(
   Seq(
     StructField("path_clean", StringType,true),
     StructField("group_override_by_path", StringType, true),
     StructField("type_override_by_path", StringType, true),
     StructField("subtype_override_by_path", StringType, true),
     StructField("is_active", BooleanType,true),
     StructField("row_created_date", DateType,true),
     StructField("row_updated_date", DateType,true),
     StructField("row_created_by", StringType,true),
     StructField("row_updated_by", StringType,true)
   )
 )

 val ByTypeSubTypeSchema = StructType(
   Seq(
     StructField("group_override", StringType, true),
     StructField("type_source", StringType, true),
     StructField("type_override", StringType, true),
     StructField("subtype_source", StringType, true),
     StructField("subtype_override", StringType, true),
     StructField("is_active", BooleanType,true),
     StructField("row_created_date", DateType,true),
     StructField("row_updated_date", DateType,true),
     StructField("row_created_by", StringType,true),
     StructField("row_updated_by", StringType,true)
   )
 )

}

Spark UI с задержкой между заданиями

Я запускаю это задание в кластере Google DataProc,и все данные хранятся в файловой системе Google (включая файлы CSV). Есть ли способ избежать этой задержки планирования?

...