Смена метаданных URI в Spark - PullRequest
0 голосов
/ 12 декабря 2018

В моей работе есть разные среды (разработка, подготовка производства и производства), и в каждой среде у нас есть определенные таблицы в метасольве Hive.У моего пользователя есть разрешения для доступа и запроса всех этих метастазов через beeline, но я хочу получить доступ к этим метасторам в сеансе spark-shell, используя sqlContext (или HiveContext).

Например, когда я получаю доступ к среде Preproductionс помощью ssh, и если я запускаю сеанс spark-shell, он автоматически создает переменную sqlContext, с помощью которой я могу выполнять запросы к метастору Preproduction.

Я также могу выполнять запросы к метастажу Production из метастажа Preproduction с помощью beeline, поэтому япопытался изменить некоторые настройки в Hive ( Как программно подключиться к метасольве Hive в SparkSQL? ).Я изменил следующие свойства:

hive.metastore.uris и hive.server2.authentication.kerberos.principal на соответствующие свойства в производственной среде.

Мой код в спарк-оболочке:

   System.setProperty("hive.server2.authentication.kerberos.principal","hive/URL@URL2")
    System.setProperty("hive.metastore.uris","thrift://URLTOPRODUCTION:9083")
    import org.apache.spark.sql.hive.HiveContext
    val hive=new HiveContext(sc)
    val df=hive.sql("SELECT * FROM DB.Table limit 10")

Но когда я выполняю последнее предложение предыдущего блока кода, я получаю следующую ошибку.

java.lang.IllegalArgumentException: java.net.UnknownHostException: nameservice1

    at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:406)

    at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:310)

    at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)

    at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:762)

    at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:693)

    at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:158)

    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2816)

    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:98)

    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2853)

    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2835)

    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:387)

    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)

    at org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$1.apply(interfaces.scala:449)

    at org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$1.apply(interfaces.scala:447)

    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)

    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)

    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)

    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)

    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)

    at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:108)

    at org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.listLeafFiles(interfaces.scala:447)

    at org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.refresh(interfaces.scala:477)

    at org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache$lzycompute(interfaces.scala:489)

    at org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache(interfaces.scala:487)

    at org.apache.spark.sql.sources.HadoopFsRelation.cachedLeafStatuses(interfaces.scala:494)

    at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$MetadataCache.refresh(ParquetRelation.scala:398)

    at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.org$apache$spark$sql$execution$datasources$parquet$ParquetRelation$$metadataCache$lzycompute(ParquetRelation.scala:145)

    at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.org$apache$spark$sql$execution$datasources$parquet$ParquetRelation$$metadataCache(ParquetRelation.scala:143)

    at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$6.apply(ParquetRelation.scala:202)

    at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$6.apply(ParquetRelation.scala:202)

    at scala.Option.getOrElse(Option.scala:120)

    at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.dataSchema(ParquetRelation.scala:202)

    at org.apache.spark.sql.sources.HadoopFsRelation.schema$lzycompute(interfaces.scala:636)

    at org.apache.spark.sql.sources.HadoopFsRelation.schema(interfaces.scala:635)

    at org.apache.spark.sql.execution.datasources.LogicalRelation.<init>(LogicalRelation.scala:39)

    at org.apache.spark.sql.hive.HiveMetastoreCatalog$$anonfun$12.apply(HiveMetastoreCatalog.scala:504)

    at org.apache.spark.sql.hive.HiveMetastoreCatalog$$anonfun$12.apply(HiveMetastoreCatalog.scala:503)

    at scala.Option.getOrElse(Option.scala:120)

    at org.apache.spark.sql.hive.HiveMetastoreCatalog.org$apache$spark$sql$hive$HiveMetastoreCatalog$$convertToParquetRelation(HiveMetastoreCatalog.scala:503)

    at org.apache.spark.sql.hive.HiveMetastoreCatalog$ParquetConversions$$anonfun$apply$1.applyOrElse(HiveMetastoreCatalog.scala:565)

    at org.apache.spark.sql.hive.HiveMetastoreCatalog$ParquetConversions$$anonfun$apply$1.applyOrElse(HiveMetastoreCatalog.scala:545)

    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:335)

    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:335)

    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)

    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:334)

    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:332)

    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:332)

    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:281)

    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)

    at scala.collection.Iterator$class.foreach(Iterator.scala:727)

    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)

    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)

    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)

    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)

    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)

    at scala.collection.AbstractIterator.to(Iterator.scala:1157)

    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)

    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)

    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)

    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)

    at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:321)

    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:332)

    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:332)

    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:332)

    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:281)

    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)

    at scala.collection.Iterator$class.foreach(Iterator.scala:727)

    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)

    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)

    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)

    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)

    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)

    at scala.collection.AbstractIterator.to(Iterator.scala:1157)

    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)

    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)

    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)

    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)

    at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:321)

    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:332)

    at org.apache.spark.sql.hive.HiveMetastoreCatalog$ParquetConversions$.apply(HiveMetastoreCatalog.scala:545)

    at org.apache.spark.sql.hive.HiveMetastoreCatalog$ParquetConversions$.apply(HiveMetastoreCatalog.scala:539)

    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:83)

    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:80)

    at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)

    at scala.collection.immutable.List.foldLeft(List.scala:84)

    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:80)

    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:72)

    at scala.collection.immutable.List.foreach(List.scala:318)

    at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:72)

    at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:37)

    at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:37)

    at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:35)

    at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:133)

    at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:52)

    at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:829)

    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:30)

    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:35)

    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:37)

    at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:39)

    at $iwC$$iwC$$iwC$$iwC.<init>(<console>:41)

    at $iwC$$iwC$$iwC.<init>(<console>:43)

    at $iwC$$iwC.<init>(<console>:45)

    at $iwC.<init>(<console>:47)

    at <init>(<console>:49)

    at .<init>(<console>:53)

    at .<clinit>(<console>)

    at .<init>(<console>:7)

    at .<clinit>(<console>)

    at $print(<console>)

    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

    at java.lang.reflect.Method.invoke(Method.java:498)

    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1045)

    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1326)

    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:821)

    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:852)

    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:800)

    at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)

    at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)

    at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)

    at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)

    at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)

    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)

    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)

    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)

    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)

    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)

    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)

    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1064)

    at org.apache.spark.repl.Main$.main(Main.scala:35)

    at org.apache.spark.repl.Main.main(Main.scala)

    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

    at java.lang.reflect.Method.invoke(Method.java:498)

    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:730)

    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)

    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)

    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)

    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

   Caused by: java.net.UnknownHostException: nameservice1

    ... 141 more

I 'используя дистрибутив Cloudera с Spark 1.6.0 и Scala 2.10.5.

Есть идеи для его решения?Заранее спасибо

1 Ответ

0 голосов
/ 17 декабря 2018

Наконец, после того, как я рассмотрел конфигурацию переменной sqlContext, которую spark-shell создает автоматически на сервере, я увидел, что существует множество URL-адресов и переменных конфигурации, и что у меня нет разрешений в HDFS или другихсерверы, которые мне нужны для выполнения запросов в метастор-хранилище PROD.

Поскольку я знаю, что запрос метастор-хранилища PROD с помощью beeline работает, я знаю, что могу запрашивать это метасторье через JDBC, поэтому я взял URL-адрес JDBC вызова beeline.

Затем я использую этот URL-адрес JDBC и начинаю использовать собственные методы и утилиты Java (из Scala) для подключения к БД через JDBC:

/*We will need hive-jdbc-0.14.0.jar to connect to a Hive metastore via JDBC */
import java.sql.ResultSetMetaData
import java.sql.{DriverManager, Connection, Statement, ResultSet}
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.Row
/* In the following lines I connect to Prod Metastore via JDBC and I execute the query as if I am connecting to a simple DB. Notice that, using this method, you are not using distributed computing */
val url="jdbc:hive2://URL_TO_PROD_METASTORE/BD;CREDENTIALS OR URL TO KERBEROS"
val query="SELECT * FROM BD.TABLE LIMIT 100"
val driver="org.apache.hive.jdbc.HiveDriver"
Class.forName(driver).newInstance
val conn: Connection = DriverManager.getConnection(url)
val r: ResultSet = conn.createStatement.executeQuery(query)
val list =scala.collection.mutable.MutableList[Row]()
/* Now we want to get all the values from all the columns. Notice that I creat a ROW object for each row of the results. Then I add each Row to a MutableList*/
while(r.next()){
  var value : Array[String] = new Array[String](r.getMetaData.getColumnCount())
  for(i<-1 to r.getMetaData.getColumnCount()){
  value(i-1) = r.getString(i)}
  list+=Row.fromSeq(value)}

/* Now we have the results of the query to PROD metastore and we want to transform this data to a Dataframe so we have to create a StructType with the names of the columns and we also need a list of rows with previous results */
var array : Array[StructField] = new Array[StructField] (r.getMetaData.getColumnCount())
for(i<- 1 to r.getMetaData.getColumnCount){
 array(i-1) =StructField(r.getMetaData.getColumnName(i),StringType)}
val struct=StructType(array)
val rdd=sc.parallelize(list)
val df=sqlContext.createDataFrame(rdd,struct)
r.close
conn.close

Обратите внимание, что этот вопрос относится к одному измои другие ответы.Потому что лучший способ экспорта результатов запроса Hive в CSV - это использование Spark ( Как экспортировать таблицу Hive в файл CSV? ).Для этого я хочу запросить Prost metastore из сеанса Spark на PRE-сервере.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...