Подключение Redshift с помощью sparklyr или sparkR? - PullRequest
0 голосов
/ 20 февраля 2019

Я пытаюсь понять, как подключить R к красному смещению с помощью spark, я не могу подключиться с помощью простого RPostgres, поскольку этот набор данных огромен и требует распределенных вычислений,

, пока я могу читать и писатьCSV из s3 в искровой фрейм данных, может кто-нибудь показать, как настроить jar и другие вещи, чтобы я мог подключить SparklyR (spark_read_jdbc ()) или sparkR к красному смещению.

Также было бы полезно, если вы можете показать, какдобавить jar-файлы в sparkContexts

До сих пор я выяснил, что в блоках данных есть несколько jar-файлов, необходимых для доступа к URL-адресу jdbc для db красного смещения.

1 Ответ

0 голосов
/ 17 марта 2019
rm(list=ls())
library(sparklyr)
#library(SparkR)
#detach('SparkR')
Sys.setenv("SPARK_MEM" = "15G")
config <- spark_config()
config$`sparklyr.shell.driver-memory` <- "8G"
config$`sparklyr.shell.executor-memory` <- "8G"
config$`spark.yarn.executor.memoryOverhead` <- "6GB"
config$`spark.dynamicAllocation.enabled`   <- "TRUE"
config$`sparklyr.shell.driver-java-options`<-list("driver-class-path" ="/home/root/spark/spark-2.1.0-bin-hadoop2.7/jars/RedshiftJDBC4-no-awssdk-1.2.20.1043.jar")
spark_dir = "/tmp/spark_temp"
config$`sparklyr.shell.driver-java-options` <-  paste0("-Djava.io.tmpdir=", spark_dir)
sc <- spark_connect(master = "local[*]", config = config)
#sc <- spark_connect(master = "local")

###invoke the spark context 
ctx <- sparklyr::spark_context(sc)
#Use below to set the java spark context ##"org.apache.spark.api.java.JavaSparkContext"
####
jsc <- sparklyr::invoke_static( sc, "org.apache.spark.api.java.JavaSparkContext", "fromSparkContext",ctx )
##invoke the hadoop context 
hconf <- jsc %>% sparklyr::invoke("hadoopConfiguration")
#hconf %>%    invoke("set","fs.s3a.access.key","<your access key for s3 >")  

hconf %>%    sparklyr::invoke("set","fs.s3a.access.key","<your access key for s3>")  
hconf %>% sparklyr::invoke("set","fs.s3a.secret.key", "<your secret key for s3>")   
hconf%>% sparklyr::invoke("set","fs.s3a.endpoint", "<your region of s3 bucket>") 

hconf %>% sparklyr::invoke("set","com.amazonaws.services.s3.enableV4", "true") 
hconf %>% sparklyr::invoke("set","spark.hadoop.fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")


hconf %>% sparklyr::invoke("set","fs.s3a.impl.disable.cache", "true") 



?spark_read_csv

###reading from s3 buckets 
spark_read_csv(sc=sc,name='sr',path="s3a://my-bucket/tmp/2district.csv",memory = TRUE)
spark_read_csv(sc=sc,name='sr_disk3',path="s3a://my-bucket/tmp/changed/",memory = FALSE)
###reading from local drive 
spark_read_csv(sc=sc,name='raw_data_loc_in3',path="/tmp/distance.csv",memory = TRUE)
spark_read_csv(sc=sc,name='raw_data_loc_in5',path="/tmp/distance.csv",memory = TRUE)









####reading from redshift table 
t<-sparklyr::spark_read_jdbc(sc, "connection",  options = list(
  url = "jdbc:redshift://<URL>:<Port>/<dbName>",
  user = "<user_name>",
  password = "<password>",
  dbtable='(Select * from sales limit 1000)',
  tempS3Dir = "s3a://my-bucket/migration"),memory = T,overwrite = T,repartition = 3)

####write rdd to csv in local
sparklyr::spark_write_csv(t,path='sample.csv')
####write rdd to csv in local
sparklyr::spark_write_csv(t,path='s3a://my-bucket/output/')
...