Как я могу использовать SparkSQL и его механизм выполнения для запроса баз данных и таблиц Hive без вызова какой-либо части механизма выполнения Hive? - PullRequest
0 голосов
/ 03 октября 2018

Я создал операторы select и join, которые я могу запустить из Hive CLI и / или beeline CLI и / или Spark (2.3.1) WITH enableHiveSupport=TRUE.(Примечание: я использую SparkR для своего API)

Объединение и запись с использованием beeline занимает 30 минут, но объединение и запись с использованием Spark с enableHiveSupport=TRUE занимает 3,5 ЧАСА.Это либо означает, что Spark и его разъемы - дерьмо, либо я не использую спарк, каким я должен быть ... и все, что я прочитал о комментариях Spark «Лучшая вещь из нарезанного хлеба», означает, что я, вероятно, неправильно его использую.

Я хочу читать из таблиц Hive, но я не хочу, чтобы Hive что-то делал.Я хотел бы запускать объединения по ежемесячным данным, запускать регрессию для ежемесячной дельты каждой записи, а затем выводить мои окончательные уклоны / бета-версии в выходную таблицу в паркете, которую можно прочитать из Hive, если необходимо ... желательно разбить на части так же, какЯ разделил таблицы, которые я использую в качестве входных данных из Hive.

Вот некоторый код в соответствии с просьбой ... но я не думаю, что вы собираетесь чему-то научиться.Вы не получите воспроизводимых результатов с запросами больших данных.

Sys.setenv(SPARK_HOME="/usr/hdp/current/spark2-client")
sessionInfo()
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
sparkR.stop()
Sys.setenv(SPARKR_SUBMIT_ARGS="--master yarn sparkr-shell") #--master yarn-client sparkr-shell
Sys.setenv(LOCAL_DIRS="/tmp")
config = list()
config$spark.cores.max <- 144L
config$spark.executor.cores <- 2L
config$spark.executor.memory <- '8g'
config$spark.driver.cores <- 6L
config$spark.driver.maxResultSize <-"0"
config$spark.driver.memory <- "32g"
config$spark.shuffle.service.enabled<-TRUE
config$spark.dynamicAllocation.enabled <-FALSE
config$spark.scheduler.mode <- 'FIFO'
config$spark.ui.port<-4044L
sparkR.session(master = "yarn",
           sparkHome = Sys.getenv("SPARK_HOME"),
           sparkConfig = config,
           enableHiveSupport = TRUE)
print("Connected!")

############ SET HIVE CONFIG 
collect(sql("SET hive.exec.dynamic.partition") )
sql("SET hive.exec.dynamic.partition=true")
collect(sql("SET hive.exec.dynamic.partition.mode"))
sql("SET hive.exec.dynamic.partition.mode=nonstrict")
##
start_time <- Sys.time()
############### READ IN DATA {FROM HIVE} 
sql('use historicdata')
data_tables<-collect(sql('show tables'))
exporttabs <- grep(pattern = 'export_historic_archive_records',x = data_tables$tableName,value = TRUE)
jointabs<-sort(exporttabs)[length(exporttabs)-(nMonths-1):0]
currenttab<-jointabs[6]

############### CREATE TABLE AND INSERT SCRIPTS 
sql(paste0('use ',hivedb))
sql(paste0('DROP TABLE IF EXISTS histdata_regression',tab_suffix))
sSelect<-paste0("Insert Into TABLE histdata_regression",tab_suffix," partition (scf) SELECT a.idkey01, a.ssn7")
sCreateQuery<-paste0("CREATE TABLE histdata_regression",tab_suffix," (idkey01 string, ssn7 string")
sFrom<-paste0("FROM historicdata.",jointabs[nMonths]," a")
sAlias<-letters[nMonths:1]
DT <- gsub(pattern = "export_historic_archive_records_",replacement = "",jointabs)
DT<-paste0(DT)
for (i in nMonths:1) {
  sSelect<-paste0(sSelect,", ",sAlias[i],".",hdAttr," as ",hdAttr,"_",i,", ",sAlias[i],".recordid as recordid_",DT[i])
  sCreateQuery<-paste0(sCreateQuery,", ",hdAttr,"_",i," int, recordid_",DT[i]," int")
  if (i==1) sCreateQuery<-paste0(sCreateQuery,') PARTITIONED BY (scf string) STORED AS ORC')
  if (i==1) sSelect<-paste0(sSelect,", a.scf")
  if (i!=nMonths) sFrom<-paste0(sFrom," inner join historicdata.",jointabs[i]," ",sAlias[i]," on ",
                              paste(paste0(paste0("a.",c("scf","idkey01","ssn7")),"=",
                                     paste0(sAlias[i],".",c("scf","idkey01","ssn7"))),collapse=" AND "))
}

system(paste0('beeline -u "jdbc:hive2://myserver1.com,myserver2.com,myserver3.com,myserver4.com,myserver5.com/work;\
            serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2" -e "',sCreateQuery,'"'))

system(paste0("beeline -u \"jdbc:hive2://myserver1.com,myserver2.com,myserver3.com,myserver4.com,myserver5.com/work;\
            serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2\" -e \"",sSelect," ",sFrom,"\""))
...