У меня есть приложение, которое обрабатывает 8 кадров данных в параллельной итерации. Работа выполнялась нормально, пока я не включил объединение этих фреймов данных с двумя действительно небольшими фреймами (менее 1 КБ), которые генерируются при чтении CSV-файлов. После включения объединения время выполнения приложения значительно увеличилось (более чем на 100%). Глядя на веб-интерфейс spark, я обнаружил, что некоторые работы со следующим описанием выполняются в ThreadPoolExecutor.java:1149. Эти задания отвечают за трансляцию очень маленьких информационных фреймов. Эти задания выполняются для каждого небольшого информационного кадра для каждого параллельного выполнения (16 раз). А задержка для каждого фрагмента выполнения ThreadPoolExecutor.java:1149 составляет около 4 минут. Проблема увеличивается, в то же время я добавил новый небольшой фрейм данных, который соединяется с 8 параллельными фреймами данных.
Для распараллеливания фреймов данных я создал список [DataFrame] .par
-Здесьобъект для чтения и создания кадров данных csv:
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types._
// s"${dataBaseParameters.viewConfigurationDir}view_classification_by_path_clean.csv
//view_classification_by_subtype.csv"
//view_classification_by_type.csv"
object ClassificationOverwriteLoader {
def loadViewClassification (spark: SparkSession, schema: StructType, filePath: String): DataFrame = {
spark.read
.format("csv")
.option("header", "true")
.option("delimiter", ";")
.schema(schema)
.load(filePath)
}
val ByPathSchema = StructType(
Seq(
StructField("path_clean", StringType,true),
StructField("group_override_by_path", StringType, true),
StructField("type_override_by_path", StringType, true),
StructField("subtype_override_by_path", StringType, true),
StructField("is_active", BooleanType,true),
StructField("row_created_date", DateType,true),
StructField("row_updated_date", DateType,true),
StructField("row_created_by", StringType,true),
StructField("row_updated_by", StringType,true)
)
)
val ByTypeSubTypeSchema = StructType(
Seq(
StructField("group_override", StringType, true),
StructField("type_source", StringType, true),
StructField("type_override", StringType, true),
StructField("subtype_source", StringType, true),
StructField("subtype_override", StringType, true),
StructField("is_active", BooleanType,true),
StructField("row_created_date", DateType,true),
StructField("row_updated_date", DateType,true),
StructField("row_created_by", StringType,true),
StructField("row_updated_by", StringType,true)
)
)
}
Spark UI с задержкой между заданиями
Я запускаю это задание в кластере Google DataProc,и все данные хранятся в файловой системе Google (включая файлы CSV). Есть ли способ избежать этой задержки планирования?