Оптимизация работы Spark - Spark 2.1 - PullRequest
0 голосов
/ 22 октября 2019

моя искровая работа в данный момент выполняется за 59 минутЯ хочу оптимизировать его, чтобы он занимал меньше времени. Я заметил, что последний шаг работы занимает много времени (55 минут) ( см. Скриншоты работы с искрой в интерфейсе Spark ниже ).

Мне нужно присоединиться кбольшой набор данных с меньшим, примените преобразования к этому объединенному набору данных (создайте новый столбец).

В конце я должен перераспределить набор данных на основе столбца PSP ( см. фрагменткод ниже ). Я также выполняю сортировку в конце (сортировка каждого раздела на основе 3 столбцов).

Все подробности (инфраструктура, конфигурация, код) можно найти ниже.

Фрагмент моего кода:

    spark.conf.set("spark.sql.shuffle.partitions", 4158)

    val uh = uh_months
      .withColumn("UHDIN", datediff(to_date(unix_timestamp(col("UHDIN_YYYYMMDD"), "yyyyMMdd").cast(TimestampType)),
        to_date(unix_timestamp(col("january"), "yyyy-MM-dd").cast(TimestampType))))
"ddMMMyyyy")).cast(TimestampType)))
      .withColumn("DVA_1", date_format(col("DVA"), "dd/MM/yyyy"))
      .drop("UHDIN_YYYYMMDD")
      .drop("january")
      .drop("DVA")
      .persist()

    val uh_flag_comment = new TransactionType().transform(uh)
    uh.unpersist()

    val uh_joined = uh_flag_comment.join(broadcast(smallDF), "NO_NUM")
      .select(
        uh.col("*"),
        smallDF.col("PSP"),
        smallDF.col("minrel"),
        smallDF.col("Label"),
        smallDF.col("StartDate"))
      .withColumnRenamed("DVA_1", "DVA")

    smallDF.unpersist()

    val uh_to_be_sorted = uh_joined.repartition(4158, col("PSP"))
    val uh_final = uh_joined.sortWithinPartitions(col("NO_NUM"), col("UHDIN"), col("HOURMV"))

    uh_final

smallDF - это небольшой набор данных (535 МБ), который я передаю.

TransactionType - это класс, в который я добавляю новый столбец строковых элементов в мой uh dataframeосновываясь на значении 3 столбцов (MMED, DEBCRED, NMTGP), проверяя значения этих столбцов с помощью регулярных выражений.

Ранее я сталкивался с множеством проблем (сбой задания) из-за перемешиванияблоки, которые не были найдены. Я обнаружил, что выполнил на диск и имел много проблем с памятью GC, поэтому я увеличил "spark.sql.shuffle.partitions" до 4158.

WHY 4158?

Partition_count = (stage input data) / (target size of your partition)

так Shuffle partition_count = (shuffle stage input data) / 200 MB = 860000/200=4300

У меня 16*24 - 6 =378 cores availaible. Поэтому, если я хочу выполнить все задачи за один раз, я должен разделить 4300 на 378, что примерно равно 11. Тогда 11 * 378 = 4158

Версия Spark: 2.1

Конфигурация кластера:

  • 24 вычислительных узла (рабочие)
  • 16 виртуальных машин каждый
  • 90 ГБ ОЗУ на узел
  • 6 ядер уже используютсяиспользуется другими процессами / заданиями

Текущая конфигурация Spark:

-мастер: пряжа

-executor-memory: 26G

-executor-ядер: 5

-драйвер памяти: 70G

-num исполнителей: 70

-spark.kryoserializer.buffer.max = 512

-spark.driver.cores = 5

-spark.driver.maxResultSize = 500 м

-spark.memory.storageFraction = 0,4

-spark.memory.fraction = 0,9

-spark.hadoop.fs.permissions.umask-mode = 007

Как выполняется задание:

Мы создаем артефакт (банку) с IntelliJ и затем отправляем его всервер. Затем выполняется скрипт bash. Этот скрипт:

  • экспортирует некоторые переменные среды (SPARK_HOME, HADOOP_CONF_DIR, PATH и SPARK_LOCAL_DIRS)

  • запускает команду spark-submit со всеми параметрамиопределенный в конфигурации искры выше

  • извлекает журналы пряжи приложения

снимки экрана Spark UI

DAG

DAG

Stages all jobs

Detailed stages of the job to improve

Stage that takes a lot of time

1 Ответ

1 голос
/ 22 октября 2019

@ Ali

Из сводных метрик можно сказать, что ваши данные искажены (максимальная продолжительность: 49 минут и максимальный размер чтения в случайном порядке / записи: 2,5 ГБ / 23 947 440, где в среднем это занимает около 4-5 минут и обработка менее 200 МБ / 1,2 ММ строк)

Теперь, когда мы знаем, что проблема может заключаться в перекосе данных в нескольких разделах, я думаю, мы можем это исправить, изменив логику перераспределения val uh_to_be_sorted = uh_joined.repartition(4158, col("PSP")) выбрав что-то (например, какой-нибудь другой столбец или добавив любой другой столбец в PSP)

несколько ссылок для ссылки на перекос данных и исправление

https://dzone.com/articles/optimize-spark-with-distribute-by-cluster-by

https://datarus.wordpress.com/2015/05/04/fighting-the-skew-in-spark/

Надеюсь, это поможет

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...