Производительность тюнинга в свечах - PullRequest
2 голосов
/ 25 апреля 2020

Я запускаю искровое задание, которое обрабатывает около 2 ТБ данных. Обработка включает в себя:

  1. Чтение данных (файлы avrò)
  2. Разнесение по столбцу типа карты
  3. OrderBy ключ из развернутого столбца
  4. Фильтруйте DataFrame (у меня есть очень маленький (7) набор ключей (назовите его keyset), для которого я хочу отфильтровать df). Я делаю df.filter(col("key").isin(keyset: _*) )
  5. Я записываю этот df в паркет (этот фрейм данных очень мал)
  6. Затем я снова фильтрую исходный фрейм данных для всех ключей, которых нет в наборе ключей df.filter(!col("key").isin(keyset: _*) ) и напиши это на паркете. Это большой набор данных.

Исходные данные avro составляют около 2 ТБ. Обработка занимает около 1 часа. Я хотел бы оптимизировать это. Я кэширую фрейм данных после шага 3, используя размер раздела shuffle 6000. min executors = 1000, max = 2000, память executor = 20 G, ядро ​​executor = 2. Любые другие предложения по оптимизации? Будет ли левое соединение более эффективным, чем фильтр?

Ответы [ 3 ]

4 голосов
/ 25 апреля 2020

Все смотрят прямо на меня. Если у вас небольшой набор данных, то isin в порядке.

1) Убедитесь, что вы можете увеличить количество ядер. ядро executor = 5

Для каждого исполнителя не рекомендуется более 5 ядер. Это основано на исследовании, в котором любое приложение с более чем 5-ю параллельными потоками начало бы снижать производительность.

2) Убедитесь, что у вас есть хорошее / равномерное разбиение разделов.

Пример (только для отладки цель не для производства):

  import org.apache.spark.sql.functions.spark_partition_id
  yourcacheddataframe.groupBy(spark_partition_id).count.show()

Это напечатает номер раздела искры и сколько записей существует в каждом разделе. основываясь на том, что вы можете перераспределить, если вы хотите больше парлелизма.

3) spark.dynamicAllocation.enabled может быть другим вариантом.

Например:

spark-submit --conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.cachedExecutorIdleTimeout=100 --conf spark.shuffle.service.enabled=true

вместе со всеми другие необходимые реквизиты ..... вот для этой работы. Если вы предоставите эти реквизиты в spark-default.conf, они будут применяться для всех заданий.

При всех этих вышеупомянутых параметрах время обработки может сократиться.

0 голосов
/ 26 апреля 2020

В дополнение к тому, что было упомянуто, несколько советов в зависимости от ваших требований и кластера:

  1. Если задание может выполняться с памятью исполнителя 20 г и 5 ядрами, вы можете уместить больше рабочие, уменьшая память исполнителя и сохраняя 5 ядер
  2. Требуется ли порядок на самом деле? Spark обеспечивает упорядочение строк внутри разделов, но не между разделами, что обычно не очень полезно.
  3. Обязательно ли файлы должны находиться в определенных c местах? Если нет, добавление
df.withColumn("in_keyset", when( col('key').isin(keyset), lit(1)).otherwise(lit(0)). \
write.partitionBy("in_keyset").parquet(...)

может ускорить операцию, чтобы предотвратить чтение данных в 2 раза. Параметр partitionBy гарантирует, что элементы набора ключей находятся в другом каталоге, чем другие ключи.

0 голосов
/ 26 апреля 2020

spark.dynamicAllocation.enabled включен

размеры разделов довольно неравномерны (в зависимости от размера выходных файлов паркетных деталей), так как я делаю клавишу orderBy, а некоторые ключи встречаются чаще, чем другие.

набор ключей - очень маленький набор (7 элементов)

...