Как перебрать множество скриптов Hive поверх spark - PullRequest
0 голосов
/ 18 октября 2019

У меня есть много сценариев кустов (примерно 20-25 сценариев), каждый из которых имеет несколько запросов. Я хочу запускать эти сценарии с помощью spark, чтобы процесс мог выполняться быстрее. Поскольку работа по уменьшению карты в улье занимает много времени для выполнения от искры, это будет намного быстрее. Ниже приведен код, который я написал, но он работает для 3-4 файлов, но при получении нескольких файлов с несколькими запросами его получение не удалось.

Ниже приведен код для того же. Пожалуйста, помогите мне, если возможно, оптимизировать то же самое.

val spark =  SparkSession.builder.master("yarn").appName("my app").enableHiveSupport().getOrCreate()

val filename = new java.io.File("/mapr/tmp/validation_script/").listFiles.filter(_.getName.endsWith(".hql")).toList

for ( i <- 0 to filename.length - 1)
{
  val filename1 = filename(i)

    scala.io.Source.fromFile(filename1).getLines()
  .filterNot(_.isEmpty)  // filter out empty lines
  .foreach(query =>
      spark.sql(query))


}

некоторые ошибки, которые я получаю, похожи на

ERROR SparkSubmit: Job aborted.
org.apache.spark.SparkException: Job aborted.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:224)

ERROR FileFormatWriter: Aborting job null.
org.apache.spark.SparkException: Job aborted due to stage failure: ShuffleMapStage 12 (sql at validationtest.scala:67) has failed the maximum allowable number of times: 4. Most recent failure reason: org.apache.spark.shuffle.FetchFailedException: failed to allocate 16777216 byte(s) of direct memory (used: 1023410176, max: 1029177344)     at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:528)

много разных типов ошибок, которые я получаю, когда запускаю один и тот же код несколькораз.

Ниже показано, как выглядит один из файлов HQL. его имя xyz.hql и имеет

drop table pontis_analyst.daydiff_log_sms_distribution
create table pontis_analyst.daydiff_log_sms_distribution as select round(datediff(date_sub(current_date(),cast(date_format(CURRENT_DATE ,'u') as int) ),cast(subscriberActivationDate as date))/7,4) as daydiff,subscriberkey as key from  pontis_analytics.prepaidsubscriptionauditlog
drop table pontis_analyst.weekly_sms_usage_distribution
create table pontis_analyst.weekly_sms_usage_distribution as select sum(event_count_ge) as eventsum,subscriber_key from pontis_analytics.factadhprepaidsubscriptionsmsevent where effective_date_ge_prt < date_sub(current_date(),cast(date_format(CURRENT_DATE ,'u') as int) - 1 ) and effective_date_ge_prt >=  date_sub(date_sub(current_date(),cast(date_format(CURRENT_DATE ,'u') as int) ),84) group by subscriber_key;
drop table pontis_analyst.daydiff_sms_distribution
create table pontis_analyst.daydiff_sms_distribution as select day.daydiff,sms.subscriber_key,sms.eventsum from  pontis_analyst.daydiff_log_sms_distribution day inner join pontis_analyst.weekly_sms_usage_distribution sms on day.key=sms.subscriber_key
drop table pontis_analyst.weekly_sms_usage_final_distribution
create table pontis_analyst.weekly_sms_usage_final_distribution as select spp.subscriberkey as key, case when spp.tenure < 3 then round((lb.eventsum )/dayDiff,4) when spp.tenure >= 3 then round(lb.eventsum/12,4)end as result from pontis_analyst.daydiff_sms_distribution lb inner join pontis_analytics.prepaidsubscriptionsubscriberprofilepanel spp on spp.subscriberkey = lb.subscriber_key
INSERT INTO TABLE pontis_analyst.validatedfinalResult select 'prepaidsubscriptionsubscriberprofilepanel' as fileName, 'average_weekly_sms_last_12_weeks' as attributeName, tbl1_1.isEqual as isEqual, tbl1_1.isEqualCount as isEqualCount, tbl1_2.countAll as countAll, (tbl1_1.isEqualCount/tbl1_2.countAll)* 100 as percentage from (select tbl1_0.isEqual as isEqual, count(isEqual) as isEqualCount from (select case when round(aal.result)  = round(srctbl.average_weekly_sms_last_12_weeks) then 1 when aal.result is null then 1 when aal.result = 'NULL' and srctbl.average_weekly_sms_last_12_weeks = '' then 1 when aal.result = '' and srctbl.average_weekly_sms_last_12_weeks = '' then 1 when aal.result is null and srctbl.average_weekly_sms_last_12_weeks = '' then 1 when aal.result is null and srctbl.average_weekly_sms_last_12_weeks is null then 1 else 0  end as isEqual from pontis_analytics.prepaidsubscriptionsubscriberprofilepanel srctbl left join  pontis_analyst.weekly_sms_usage_final_distribution aal on srctbl.subscriberkey = aal.key) tbl1_0 group by tbl1_0.isEqual) tbl1_1 inner join (select count(*) as countAll from pontis_analytics.prepaidsubscriptionsubscriberprofilepanel) tbl1_2 on 1=1

1 Ответ

1 голос
/ 19 октября 2019

Ваша проблема в том, что вашему коду не хватает памяти, как показано ниже

failed to allocate 16777216 byte(s) of direct memory (used: 1023410176, max: 1029177344)

Хотя то, что вы пытаетесь сделать, не является оптимальным способом выполнения действий в Spark, но я быРекомендую удалить сериализацию памяти, так как в любом случае это не поможет. Вам следует кэшировать данные только в том случае, если они будут использоваться в нескольких преобразованиях. Если он будет использоваться один раз, нет причин помещать данные в кеш.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...