Искры shuffle.FetchFailedException с данными 260 ГБ - PullRequest
1 голос
/ 31 марта 2020

, пожалуйста, помогите:

я запускаю код pyspark для basi c etl с размерами данных объединений около 270 ГБ, код улья работает для завершения процесса, когда я преобразовал его в spark sql я получаю следующую ошибку, есть ли способ ее исправить.

  • Я пробовал подсказки искривления и увеличения числа разделов
  • нужно ли разбивать данные на меньшие и обрабатывать его.

во время выполнения конфигурации:

--name "xx" --num-executors 3 --executor-cores 5 --executor-memory 6G --driver-memory 9G --deploy-mode cluster  --master yarn --queue xx --conf spark.yarn.nodemanager.local-dirs=$xx/tmp/tmp1 --conf spark.executor.memoryOverhead=20300 --conf spark.driver.maxResultSize=0 --conf spark.yarn.submit.waitAppCompletion=true --conf spark.sql.parquet.binaryAsString=true  --conf spark.yarn.am.attemptFailuresValidityInterval=24h --conf spark.executor.heartbeatInterval=60s --conf spark.network.timeout=1200s

спасибо за помощь.

снимок экрана spark ui: enter image description here

org.apache.spark.shuffle.FetchFailedException: failed to allocate 16777216 byte(s) of direct memory (used: 536870912, max: 536870912)

full spark ui

запрос (они не являются непрерывными и имена столбцов являются изменениями): я добавил перераспределение после каждого присоединения к улучшить данные

              xxx=sqlContext.sql(""" select a.*,lpad(b.col1,12,0) as col2 from
    data_3 as a LEFT JOIN
    (select col1, cm13 from linkage_data) as b on a.cm13=b.cm13 """).repartition(400)


 model_score=sqlContext.sql(""" select /*+ SKEW('a', ('market', 'cmpl_yr')) */ a.*,b.model_score as model_score from
    (select * from combined_data_temp) as a 
    LEFT JOIN
    %s.campaign_data as b 
    ON 
    trim(a.campno)=trim(b.cmpgn_no) AND 
    trim(a.campdate)=trim(b.cmpl_dt) AND 
    a.cm15=b.cm15 AND
    a.campaign_id=b.cmpgn_id AND 
    a.execution_id=b.exec_id AND 
    a.offer_id=b.offr_id AND
    trim(a.market)=trim(b.mkt_cd) AND 
    a.cmpl_yr=b.cmpl_yr """% (dbin1)).repartition(400)

      combined_data_srg=sqlContext.sql(""" select 
    a.*,
    b.*, 
    concat((b.srg_no),'0000',(a.supp_no),'00') as xy 
    from
    data_temp3 as a 
    LEFT JOIN 
    srg_no as b 
    ON 
    trim(a.cm11)=trim(b.no) """).repartition(400)

1 Ответ

1 голос
/ 01 апреля 2020

--executor-memory 6G --driver-memory 9G может быть ошибочной конфигурацией, я думаю, что она должна быть обратной. В общем случае память исполнителя должна быть больше, чем память драйвера

Когда исполнителю не хватает памяти, может произойти следующее исключение происходить.

Разрешение: рассмотрите следующие шаги в параметрах командной строки Spark Submit:

Установите более высокое значение для памяти, используя одну из следующих команд:

--conf spark.executor.memory= <XX>g

ИЛИ

--executor-memory <XX>G

Увеличьте количество разделов в случайном порядке, используя следующую команду:

см. Производительность Настройка документов из плагина .. spark.sql.shuffle.partitions: настройка количества разделов для использования при перетасовке данных для объединений или агрегатов.

--spark.sql.shuffle.partitions

также рассмотрите настройку этих свойств

SET spark.reducer.maxReqsInFlight=1;  -- Only pull one file at a time to use full network bandwidth.
SET spark.shuffle.io.retryWait=60s;  -- Increase the time to wait while retrieving shuffle partitions before retrying. Longer times are necessary for larger files.
SET spark.shuffle.io.maxRetries=10
  • Отфильтруйте ненужные данные перед перемешиванием, например, при выборе столбцов запроса, если исходные данные имеют 30 полей, при условии, что вы выбираете обязательные поля для обработки, это уменьшит определенное значение Fle данные. Здесь я говорю о a.*, b.* select * в вас sql запросов

  • Всегда используйте небольшую таблицу для трансляции BHJ для левого соединения, которое вы использовали. Например: если вы можете транслировать, перемешивание может быть значительно уменьшено.

sparkConfiguration.set("spark.sql.autoBroadcastJoinThreshold", "your broadcast size") // default 10mb

 Result = BigTable.join(
  org.apache.spark.sql.functions.broadcast(SmallTable),
  Seq("X", "Y", "Z", "W", "V"),
  "left_outer"
)

см. Это Оптимизация объединения DataFrame - Broadcast Ha sh Join

...