, пожалуйста, помогите:
я запускаю код pyspark для basi c etl с размерами данных объединений около 270 ГБ, код улья работает для завершения процесса, когда я преобразовал его в spark sql я получаю следующую ошибку, есть ли способ ее исправить.
- Я пробовал подсказки искривления и увеличения числа разделов
- нужно ли разбивать данные на меньшие и обрабатывать его.
во время выполнения конфигурации:
--name "xx" --num-executors 3 --executor-cores 5 --executor-memory 6G --driver-memory 9G --deploy-mode cluster --master yarn --queue xx --conf spark.yarn.nodemanager.local-dirs=$xx/tmp/tmp1 --conf spark.executor.memoryOverhead=20300 --conf spark.driver.maxResultSize=0 --conf spark.yarn.submit.waitAppCompletion=true --conf spark.sql.parquet.binaryAsString=true --conf spark.yarn.am.attemptFailuresValidityInterval=24h --conf spark.executor.heartbeatInterval=60s --conf spark.network.timeout=1200s
спасибо за помощь.
снимок экрана spark ui:
org.apache.spark.shuffle.FetchFailedException: failed to allocate 16777216 byte(s) of direct memory (used: 536870912, max: 536870912)
запрос (они не являются непрерывными и имена столбцов являются изменениями): я добавил перераспределение после каждого присоединения к улучшить данные
xxx=sqlContext.sql(""" select a.*,lpad(b.col1,12,0) as col2 from
data_3 as a LEFT JOIN
(select col1, cm13 from linkage_data) as b on a.cm13=b.cm13 """).repartition(400)
model_score=sqlContext.sql(""" select /*+ SKEW('a', ('market', 'cmpl_yr')) */ a.*,b.model_score as model_score from
(select * from combined_data_temp) as a
LEFT JOIN
%s.campaign_data as b
ON
trim(a.campno)=trim(b.cmpgn_no) AND
trim(a.campdate)=trim(b.cmpl_dt) AND
a.cm15=b.cm15 AND
a.campaign_id=b.cmpgn_id AND
a.execution_id=b.exec_id AND
a.offer_id=b.offr_id AND
trim(a.market)=trim(b.mkt_cd) AND
a.cmpl_yr=b.cmpl_yr """% (dbin1)).repartition(400)
combined_data_srg=sqlContext.sql(""" select
a.*,
b.*,
concat((b.srg_no),'0000',(a.supp_no),'00') as xy
from
data_temp3 as a
LEFT JOIN
srg_no as b
ON
trim(a.cm11)=trim(b.no) """).repartition(400)