моя искровая работа в данный момент выполняется за 59 минутЯ хочу оптимизировать его, чтобы он занимал меньше времени. Я заметил, что последний шаг работы занимает много времени (55 минут) ( см. Скриншоты работы с искрой в интерфейсе Spark ниже ).
Мне нужно присоединиться кбольшой набор данных с меньшим, примените преобразования к этому объединенному набору данных (создайте новый столбец).
В конце я должен перераспределить набор данных на основе столбца PSP
( см. фрагменткод ниже ). Я также выполняю сортировку в конце (сортировка каждого раздела на основе 3 столбцов).
Все подробности (инфраструктура, конфигурация, код) можно найти ниже.
Фрагмент моего кода:
spark.conf.set("spark.sql.shuffle.partitions", 4158)
val uh = uh_months
.withColumn("UHDIN", datediff(to_date(unix_timestamp(col("UHDIN_YYYYMMDD"), "yyyyMMdd").cast(TimestampType)),
to_date(unix_timestamp(col("january"), "yyyy-MM-dd").cast(TimestampType))))
"ddMMMyyyy")).cast(TimestampType)))
.withColumn("DVA_1", date_format(col("DVA"), "dd/MM/yyyy"))
.drop("UHDIN_YYYYMMDD")
.drop("january")
.drop("DVA")
.persist()
val uh_flag_comment = new TransactionType().transform(uh)
uh.unpersist()
val uh_joined = uh_flag_comment.join(broadcast(smallDF), "NO_NUM")
.select(
uh.col("*"),
smallDF.col("PSP"),
smallDF.col("minrel"),
smallDF.col("Label"),
smallDF.col("StartDate"))
.withColumnRenamed("DVA_1", "DVA")
smallDF.unpersist()
val uh_to_be_sorted = uh_joined.repartition(4158, col("PSP"))
val uh_final = uh_joined.sortWithinPartitions(col("NO_NUM"), col("UHDIN"), col("HOURMV"))
uh_final
smallDF
- это небольшой набор данных (535 МБ), который я передаю.
TransactionType
- это класс, в который я добавляю новый столбец строковых элементов в мой uh
dataframeосновываясь на значении 3 столбцов (MMED
, DEBCRED
, NMTGP
), проверяя значения этих столбцов с помощью регулярных выражений.
Ранее я сталкивался с множеством проблем (сбой задания) из-за перемешиванияблоки, которые не были найдены. Я обнаружил, что выполнил на диск и имел много проблем с памятью GC, поэтому я увеличил "spark.sql.shuffle.partitions" до 4158.
WHY 4158?
Partition_count = (stage input data) / (target size of your partition)
так Shuffle partition_count = (shuffle stage input data) / 200 MB = 860000/200=4300
У меня 16*24 - 6 =378 cores availaible
. Поэтому, если я хочу выполнить все задачи за один раз, я должен разделить 4300 на 378, что примерно равно 11. Тогда 11 * 378 = 4158
Версия Spark: 2.1
Конфигурация кластера:
- 24 вычислительных узла (рабочие)
- 16 виртуальных машин каждый
- 90 ГБ ОЗУ на узел
- 6 ядер уже используютсяиспользуется другими процессами / заданиями
Текущая конфигурация Spark:
-мастер: пряжа
-executor-memory: 26G
-executor-ядер: 5
-драйвер памяти: 70G
-num исполнителей: 70
-spark.kryoserializer.buffer.max = 512
-spark.driver.cores = 5
-spark.driver.maxResultSize = 500 м
-spark.memory.storageFraction = 0,4
-spark.memory.fraction = 0,9
-spark.hadoop.fs.permissions.umask-mode = 007
Как выполняется задание:
Мы создаем артефакт (банку) с IntelliJ и затем отправляем его всервер. Затем выполняется скрипт bash. Этот скрипт:
экспортирует некоторые переменные среды (SPARK_HOME, HADOOP_CONF_DIR, PATH и SPARK_LOCAL_DIRS)
запускает команду spark-submit со всеми параметрамиопределенный в конфигурации искры выше
извлекает журналы пряжи приложения
снимки экрана Spark UI
DAG