У меня около 20-30 запросов в кусте на CDH 5.13.Чтобы оценить производительность, когда тот же запрос выполняется на спринге, я написал код, который читает запрос из файла и выполняет их на спарке, используя HiveContext.
Мой код Python, который выполняет запрос, выглядит примерно так:
#step-1: readQueryFromFile is custom function that reads file uploaded via spark-submit and accessible to this driver code ad finalQuery is printed
finalQuery = readQueryFromFile(sqlFile,dict)
print finalQuery
# step-2: prepare context
sparkCtx = SparkContext.getOrCreate(SparkConf())
hc = HiveContext(sparkCtx)
# step-3: run sql
rdd = hc.sql(finalQuery)
# step-4: called explicit action
rdd.show()
Все запросы в следующем формате.
create table [tablename] stored as parquet as select * from stg_[tablename]
С учетом приведенного ниже сценария мои вопросы:
В-1: Поскольку эти запросы не возвращают данные вФрейм данных на драйвере, но создайте временную таблицу, я предполагаю, что все операции выполняются на узлах-исполнителях и не должны беспокоиться о памяти драйвера / главного узла.Правильно ли я в этом предположении ??
Q-2: действительно ли мне нужно вызвать action на шаге 4 для вызова запроса?
Q-3: следующий запрос создает таблицу с левойобъединить причину из-за ошибки памяти, и я не могу увеличить память дальше.Если что-то не помещается в память, спарк должен записать на жесткий диск, верно?если да, есть ли подсказка, которая может помочь гарантировать успешность операций?
Примечание. Этот запрос отлично работает на кусте.
create table test_final stored as parquet as
select * from stg_test1 w
left join (
select * from test1
where strt_dt <= '${hiveconf:mon_lst_d}' or strt_dt is null
) l
on w.k1 = l.k1 and w.k2 = l.k2 and w.k3 = l.k3
left join (
select * from test2
where strt_dt <= '${hiveconf:mon_lst_d}' or strt_dt is null
) e
on w.k1 = e.k1 and w.k2 = e.k2 and l.k4 = e.k4
left join (
select * from test3
where strt_dt <= '${hiveconf:mon_lst_d}' or strt_dt is null
) c
on w.k1 = c.k1 and w.k2 = c.k2 and w.k5 = c.k5
left join (
select * from test4
where strt_dt <= '${hiveconf:mon_lst_d}' or strt_dt is null
) j
on w.k1 = j.k1 and w.k2 = j.k2 and w.k6 = j.k6
left join (
select * from test5
where strt_dt <= '${hiveconf:mon_lst_d}' or strt_dt is null
) o
on w.k1 = o.k1 and w.k2 = o.k2 and w.k7 = o.k7
left join (
select * from test6
where strt_dt <= '${hiveconf:mon_lst_d}' or strt_dt is null
) s
on w.k1 = s.k1 and w.k2= s.k2 and w.k8 = s.8 and w.k9 = s.k9 and w.k10 = s.k10
Вот мой запрос на отправку
from optparse import OptionParser
import re, subprocess, sys,os
executorMemory = "8G"
executeCores = "3"
numExecutors = "100"
memoryOverhead = "8192"
def run_spark_submit(sqlfile,ap_month, hr_extract_month, spark_params=None):
rc = None
print os.getcwd()
sqlfileLocation = '../hive/'+sqlfile
if not spark_params:
spark_params = [
"--master", "yarn-client",
"--num-executors", numExecutors,
"--executor-memory", executorMemory,
"--executor-cores", executeCores,
"--conf", "spark.yarn.executor.memoryOverhead="+memoryOverhead,
"--files", sqlfileLocation,
"--py-files", "../spark/utils.py"
]
print "spark-submit %s ../spark/run_spark_query.py -f %s -m %s -i %s" % (" ".join(spark_params), sqlfile, ap_month, hr_extract_month)
rc = subprocess.call("spark-submit %s ../spark/run_spark_query.py -f %s -m %s -i %s" % (" ".join(spark_params), sqlfile, ap_month, hr_extract_month),shell=True)
if rc:
print "spark submit failed for sqlfile=%s ap_month:%s hr_extract_month=%s" % (sqlfile, ap_month, hr_extract_month)
sys.exit(-1)