Выполняет ли создание таблицы из оператора select какую-либо операцию с драйвером / главным узлом в spark? - PullRequest
0 голосов
/ 24 октября 2018

У меня около 20-30 запросов в кусте на CDH 5.13.Чтобы оценить производительность, когда тот же запрос выполняется на спринге, я написал код, который читает запрос из файла и выполняет их на спарке, используя HiveContext.

Мой код Python, который выполняет запрос, выглядит примерно так:

   #step-1: readQueryFromFile is custom function that reads file uploaded via spark-submit and accessible to this driver code ad finalQuery is printed
   finalQuery = readQueryFromFile(sqlFile,dict)
   print finalQuery
   # step-2: prepare context
   sparkCtx = SparkContext.getOrCreate(SparkConf())
   hc = HiveContext(sparkCtx)
   # step-3: run sql
   rdd = hc.sql(finalQuery)
   # step-4: called explicit action
   rdd.show()

Все запросы в следующем формате.

create table [tablename] stored as parquet as select * from stg_[tablename]

С учетом приведенного ниже сценария мои вопросы:

В-1: Поскольку эти запросы не возвращают данные вФрейм данных на драйвере, но создайте временную таблицу, я предполагаю, что все операции выполняются на узлах-исполнителях и не должны беспокоиться о памяти драйвера / главного узла.Правильно ли я в этом предположении ??

Q-2: действительно ли мне нужно вызвать action на шаге 4 для вызова запроса?

Q-3: следующий запрос создает таблицу с левойобъединить причину из-за ошибки памяти, и я не могу увеличить память дальше.Если что-то не помещается в память, спарк должен записать на жесткий диск, верно?если да, есть ли подсказка, которая может помочь гарантировать успешность операций?

Примечание. Этот запрос отлично работает на кусте.

create table test_final stored as parquet as 
select * from stg_test1 w
 left join (
            select * from test1
            where strt_dt <= '${hiveconf:mon_lst_d}' or strt_dt is null
        ) l
        on w.k1 = l.k1 and w.k2 = l.k2 and w.k3 = l.k3
        left join (
            select * from test2
            where strt_dt <= '${hiveconf:mon_lst_d}' or strt_dt is null
        ) e
        on w.k1 = e.k1 and w.k2 = e.k2 and l.k4 = e.k4
        left join (
            select * from test3
            where strt_dt <= '${hiveconf:mon_lst_d}' or strt_dt is null
        ) c
        on w.k1 = c.k1 and w.k2 = c.k2 and w.k5 = c.k5
        left join (
            select * from test4
            where strt_dt <= '${hiveconf:mon_lst_d}' or strt_dt is null
        ) j
        on w.k1 = j.k1 and w.k2 = j.k2 and w.k6 = j.k6
        left join (
            select * from test5
            where strt_dt <= '${hiveconf:mon_lst_d}' or strt_dt is null
        ) o
        on w.k1 = o.k1 and w.k2 = o.k2 and w.k7 = o.k7
        left join (
            select * from test6
            where strt_dt <= '${hiveconf:mon_lst_d}' or strt_dt is null
        ) s
        on w.k1 = s.k1 and w.k2= s.k2 and w.k8 = s.8 and w.k9 = s.k9 and w.k10 = s.k10

Вот мой запрос на отправку

from optparse import OptionParser
import re, subprocess, sys,os

executorMemory = "8G"
executeCores = "3"
numExecutors = "100"
memoryOverhead = "8192"
def run_spark_submit(sqlfile,ap_month, hr_extract_month, spark_params=None):
    rc = None
    print os.getcwd()

    sqlfileLocation = '../hive/'+sqlfile
    if not spark_params:
        spark_params = [
            "--master", "yarn-client",
            "--num-executors", numExecutors,
            "--executor-memory", executorMemory,
            "--executor-cores", executeCores,
            "--conf", "spark.yarn.executor.memoryOverhead="+memoryOverhead,
            "--files", sqlfileLocation,
            "--py-files", "../spark/utils.py"
        ]
    print "spark-submit %s ../spark/run_spark_query.py -f %s -m %s -i %s" % (" ".join(spark_params), sqlfile, ap_month, hr_extract_month)
    rc = subprocess.call("spark-submit %s ../spark/run_spark_query.py -f %s -m %s -i %s" % (" ".join(spark_params), sqlfile, ap_month, hr_extract_month),shell=True)
    if rc:
        print "spark submit failed for sqlfile=%s ap_month:%s hr_extract_month=%s" % (sqlfile, ap_month, hr_extract_month)
        sys.exit(-1)
...