У меня есть много сценариев кустов (примерно 20-25 сценариев), каждый из которых имеет несколько запросов. Я хочу запускать эти сценарии с помощью spark, чтобы процесс мог выполняться быстрее. Поскольку работа по уменьшению карты в улье занимает много времени для выполнения от искры, это будет намного быстрее. Ниже приведен код, который я написал, но он работает для 3-4 файлов, но при получении нескольких файлов с несколькими запросами его получение не удалось.
Ниже приведен код для того же. Пожалуйста, помогите мне, если возможно, оптимизировать то же самое.
val spark = SparkSession.builder.master("yarn").appName("my app").enableHiveSupport().getOrCreate()
val filename = new java.io.File("/mapr/tmp/validation_script/").listFiles.filter(_.getName.endsWith(".hql")).toList
for ( i <- 0 to filename.length - 1)
{
val filename1 = filename(i)
scala.io.Source.fromFile(filename1).getLines()
.filterNot(_.isEmpty) // filter out empty lines
.foreach(query =>
spark.sql(query))
}
некоторые ошибки, которые я получаю, похожи на
ERROR SparkSubmit: Job aborted.
org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:224)
ERROR FileFormatWriter: Aborting job null.
org.apache.spark.SparkException: Job aborted due to stage failure: ShuffleMapStage 12 (sql at validationtest.scala:67) has failed the maximum allowable number of times: 4. Most recent failure reason: org.apache.spark.shuffle.FetchFailedException: failed to allocate 16777216 byte(s) of direct memory (used: 1023410176, max: 1029177344) at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:528)
много разных типов ошибок, которые я получаю, когда запускаю один и тот же код несколькораз.
Ниже показано, как выглядит один из файлов HQL. его имя xyz.hql и имеет
drop table pontis_analyst.daydiff_log_sms_distribution
create table pontis_analyst.daydiff_log_sms_distribution as select round(datediff(date_sub(current_date(),cast(date_format(CURRENT_DATE ,'u') as int) ),cast(subscriberActivationDate as date))/7,4) as daydiff,subscriberkey as key from pontis_analytics.prepaidsubscriptionauditlog
drop table pontis_analyst.weekly_sms_usage_distribution
create table pontis_analyst.weekly_sms_usage_distribution as select sum(event_count_ge) as eventsum,subscriber_key from pontis_analytics.factadhprepaidsubscriptionsmsevent where effective_date_ge_prt < date_sub(current_date(),cast(date_format(CURRENT_DATE ,'u') as int) - 1 ) and effective_date_ge_prt >= date_sub(date_sub(current_date(),cast(date_format(CURRENT_DATE ,'u') as int) ),84) group by subscriber_key;
drop table pontis_analyst.daydiff_sms_distribution
create table pontis_analyst.daydiff_sms_distribution as select day.daydiff,sms.subscriber_key,sms.eventsum from pontis_analyst.daydiff_log_sms_distribution day inner join pontis_analyst.weekly_sms_usage_distribution sms on day.key=sms.subscriber_key
drop table pontis_analyst.weekly_sms_usage_final_distribution
create table pontis_analyst.weekly_sms_usage_final_distribution as select spp.subscriberkey as key, case when spp.tenure < 3 then round((lb.eventsum )/dayDiff,4) when spp.tenure >= 3 then round(lb.eventsum/12,4)end as result from pontis_analyst.daydiff_sms_distribution lb inner join pontis_analytics.prepaidsubscriptionsubscriberprofilepanel spp on spp.subscriberkey = lb.subscriber_key
INSERT INTO TABLE pontis_analyst.validatedfinalResult select 'prepaidsubscriptionsubscriberprofilepanel' as fileName, 'average_weekly_sms_last_12_weeks' as attributeName, tbl1_1.isEqual as isEqual, tbl1_1.isEqualCount as isEqualCount, tbl1_2.countAll as countAll, (tbl1_1.isEqualCount/tbl1_2.countAll)* 100 as percentage from (select tbl1_0.isEqual as isEqual, count(isEqual) as isEqualCount from (select case when round(aal.result) = round(srctbl.average_weekly_sms_last_12_weeks) then 1 when aal.result is null then 1 when aal.result = 'NULL' and srctbl.average_weekly_sms_last_12_weeks = '' then 1 when aal.result = '' and srctbl.average_weekly_sms_last_12_weeks = '' then 1 when aal.result is null and srctbl.average_weekly_sms_last_12_weeks = '' then 1 when aal.result is null and srctbl.average_weekly_sms_last_12_weeks is null then 1 else 0 end as isEqual from pontis_analytics.prepaidsubscriptionsubscriberprofilepanel srctbl left join pontis_analyst.weekly_sms_usage_final_distribution aal on srctbl.subscriberkey = aal.key) tbl1_0 group by tbl1_0.isEqual) tbl1_1 inner join (select count(*) as countAll from pontis_analytics.prepaidsubscriptionsubscriberprofilepanel) tbl1_2 on 1=1