искра, имеющая отношение к карбондате - PullRequest
0 голосов
/ 17 мая 2018

Ниже приведен фрагмент кода, который я пытаюсь использовать для создания таблицы карбондата в S3. Однако, несмотря на установку учетных данных aws в hadoopconfiguration, он по-прежнему жалуется на то, что секретный ключ и ключ доступа не установлены. В чем здесь проблема?

 import org.apache.spark.sql.CarbonSession._
 import org.apache.spark.sql.CarbonSession._
 val carbon = SparkSession.builder().config(sc.getConf).getOrCreateCarbonSession("s3n://url")
carbon.sparkContext.hadoopConfiguration.set("fs.s3n.awsAccessKeyId","<accesskey>")
   carbon.sparkContext.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey","<secretaccesskey>")
carbon.sql("CREATE TABLE IF NOT EXISTS test_table(id string,name string,city string,age Int) STORED BY 'carbondata'")

Последняя команда выдает ошибку:

java.lang.IllegalArgumentException: идентификатор ключа доступа AWS и секрет Ключ доступа должен быть указан как имя пользователя или пароль (соответственно) s3n-URL или установкой fs.s3n.awsAccessKeyId или свойства fs.s3n.awsSecretAccessKey (соответственно)

Spark Version : 2.2.1
Command used to start spark-shell:
$SPARK_PATH/bin/spark-shell --jars /localpath/jar/apache-carbondata-1.3.1-bin-spark2.2.1-hadoop2.7.2/apache-carbondata-1.3.1-bin-spark2.2.1-hadoop2.7.2.jar,/localpath/jar/spark-avro_2.11-4.0.0.jar --packages com.amazonaws:aws-java-sdk-pom:1.9.22,org.apache.hadoop:hadoop-aws:2.7.2,org.slf4j:slf4j-simple:1.7.21,asm:asm:3.2,org.xerial.snappy:snappy-java:1.1.7.1,com.databricks:spark-avro_2.11:4.0.0

UPDATE:

Обнаружено, что поддержка S3 доступна только в 1.4.0 RC1. Поэтому я собрал RC1 и проверил приведенный ниже код на то же самое. Но все же я, кажется, сталкиваюсь с проблемами. Любая помощь приветствуется. Код:

import org.apache.spark.sql.CarbonSession._
import org.apache.hadoop.fs.s3a.Constants.{ACCESS_KEY, ENDPOINT, SECRET_KEY}
import org.apache.spark.sql.SparkSession
import org.apache.carbondata.core.constants.CarbonCommonConstants
object sample4 {
def main(args: Array[String]) {
val (accessKey, secretKey, endpoint) = getKeyOnPrefix("s3n://")
//val rootPath = new File(this.getClass.getResource("/").getPath
//                            + "../../../..").getCanonicalPath
val path = "/localpath/sample/data1.csv"
val spark = SparkSession
      .builder()
      .master("local")
      .appName("S3UsingSDKExample")
      .config("spark.driver.host", "localhost")
      .config(accessKey, "<accesskey>")
      .config(secretKey, "<secretkey>")
      //.config(endpoint, "s3-us-east-1.amazonaws.com")
      .getOrCreateCarbonSession()
      spark.sql("Drop table if exists carbon_table")

    spark.sql(
      s"""
         | CREATE TABLE if not exists carbon_table(
         | shortField SHORT,
         | intField INT,
         | bigintField LONG,
         | doubleField DOUBLE,
         | stringField STRING,
         | timestampField TIMESTAMP,
         | decimalField DECIMAL(18,2),
         | dateField DATE,
         | charField CHAR(5),
         | floatField FLOAT
         | )
         | STORED BY 'carbondata'
         | LOCATION 's3n://bucketName/table/carbon_table'
         | TBLPROPERTIES('SORT_COLUMNS'='', 'DICTIONARY_INCLUDE'='dateField, charField')
       """.stripMargin)

}


def getKeyOnPrefix(path: String): (String, String, String) = {
    val endPoint = "spark.hadoop." + ENDPOINT
    if (path.startsWith(CarbonCommonConstants.S3A_PREFIX)) {
      ("spark.hadoop." + ACCESS_KEY, "spark.hadoop." + SECRET_KEY, endPoint)
    } else if (path.startsWith(CarbonCommonConstants.S3N_PREFIX)) {
      ("spark.hadoop." + CarbonCommonConstants.S3N_ACCESS_KEY,
        "spark.hadoop." + CarbonCommonConstants.S3N_SECRET_KEY, endPoint)
    } else if (path.startsWith(CarbonCommonConstants.S3_PREFIX)) {
      ("spark.hadoop." + CarbonCommonConstants.S3_ACCESS_KEY,
        "spark.hadoop." + CarbonCommonConstants.S3_SECRET_KEY, endPoint)
    } else {
      throw new Exception("Incorrect Store Path")
    }
  }
  def getSparkMaster(args: Array[String]): String = {
    if (args.length == 6) args(5)
    else if (args(3).contains("spark:") || args(3).contains("mesos:")) args(3)
    else "local"
  }
}

Ошибка:

18/05/17 12:23:22 ERROR SegmentStatusManager: main Failed to read metadata of load
org.apache.hadoop.fs.s3.S3Exception: org.jets3t.service.ServiceException: Request Error: Empty key

Я также пробовал использовать пример кода в (пробовал протоколы s3, s3n, s3a):

https://github.com/apache/carbondata/blob/master/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3Example.scala

Ран как:

S3Example.main (Array ( "Accesskey", "SecretKey", "s3: // bucketName / путь / carbon_table", "https://bucketName.s3.amazonaws.com","local"))

Ошибка трассировки стека:

org.apache.hadoop.fs.s3.S3Exception: org.jets3t.service.S3ServiceException: ошибка запроса: пустой ключ в org.apache.hadoop.fs.s3.Jets3tFileSystemStore.get (Jets3tFileSystemStore.java:175) в org.apache.hadoop.fs.s3.Jets3tFileSystemStore.retrieveINode (Jets3tFileSystemStore.java:221) at sun.reflect.GeneratedMethodAccessor42.invoke (неизвестный источник) в sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43) в java.lang.reflect.Method.invoke (Method.java:498) в org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod (RetryInvocationHandler.java:191) в org.apache.hadoop.io.retry.RetryInvocationHandler.invoke (RetryInvocationHandler.java:102) в com.sun.proxy. $ Proxy21.retrieveINode (неизвестный источник) в org.apache.hadoop.fs.s3.S3FileSystem.getFileStatus (S3FileSystem.java:340) в org.apache.hadoop.fs.FileSystem.exists (FileSystem.java:1426) в org.apache.carbondata.core.datastore.filesystem.AbstractDFSCarbonFile.isFileExist (AbstractDFSCarbonFile.java:426) в org.apache.carbondata.core.datastore.impl.FileFactory.isFileExist (FileFactory.java:201) в org.apache.carbondata.core.statusmanager.SegmentStatusManager.readTableStatusFile (SegmentStatusManager.java:246) в org.apache.carbondata.core.statusmanager.SegmentStatusManager.readLoadMetadata (SegmentStatusManager.java:197) в org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree.clearBTreeAndDictionaryLRUCache (ManageDictionaryAndBTree.java:101) в org.apache.spark.sql.hive.CarbonFileMetastore.dropTable (CarbonFileMetastore.scala: 460) в org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand.processMetadata (CarbonCreateTableCommand.scala: 148) в org.apache.spark.sql.execution.command.MetadataCommand.run (package.scala: 68) в org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult $ lzycompute (commands.scala: 58) в org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult (commands.scala: 56) в org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect (commands.scala: 67) в org.apache.spark.sql.Dataset. (Dataset.scala: 183) в org.apache.spark.sql.CarbonSession $$ anonfun $ SQL $ 1.Apply (CarbonSession.scala: 107) в org.apache.spark.sql.CarbonSession $$ anonfun $ SQL $ 1.Apply (CarbonSession.scala: 96) в org.apache.spark.sql.CarbonSession.withProfiler (CarbonSession.scala: 144) в org.apache.spark.sql.CarbonSession.sql (CarbonSession.scala: 94) в $ line19. $ read $$ iw $$ iw $$ iw $$ iw $$ iw $$ iw $$ iw $$ iw $ S3Example $ .main (: 68) в $ line26. $ read $$ iw $$ iw $$ IW $$ $$ IW IW IW $$ $$ $$ IW IW. (: 31) в $ line26. $ читать $$ iw $$ iw $$ iw $$ iw $$ iw $$ iw $$ iw. (: 36) в $ line26. $ read $$ iw $$ iw $$ iw $$ iw $$ iw $$ iw. (: 38) в$ line26. $ read $$ iw $$ iw $$ iw $$ iw $$ iw. (: 40) в $ line26. $ read $$ iw $$ iw $$ iw $$ iw. (: 42) в $line26. $ read $$ iw $$ iw $$ iw. (: 44) в $ line26. $ read $$ iw $$ iw. (: 46) в $ line26. $ read $$ iw. (: 48) в$ line26. $ read. (: 50) в $ line26. $ read $. (: 54) в $ line26. $ read $. () в $ line26. $ eval $. $ print $ lzycompute (: 7) в $line26. $ eval $. $ print (: 6) в $ line26. $ eval. $ print () в sun.reflect.NativeMethodAccessorImpl.invoke0 (собственный метод) в sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:)в sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43) в java.lang.reflect.Method.invoke (Method.java:498) в scala.tools.nsc.interpreter.IMain $ ReadEvalPrintcall (: 786) на scala.tools.nsc.interpreter.IMain $ Request.loadAndRun (IMain.scala: 1047) на scala.tools.nsc.interpreter.IMain $ WrappedRequest $$ anonfun $ loadAndRunReq $ 1.apply (IMain.scala: 638) в scala.tools.nsc.interpreter.IMain $ WrappedRequest $$ anonfun $ loadAndRunReq $ 1.apply (IMain.scala: 637) в scala.reflect.internal.util.ScalaClassLoader $ class.asContext (ScalaClassLoader.scala: 31) в scala.reflect.internal.util.AbstractFileClassLoader.asContext (AbstractFileClassLoader.scala: 19) в scala.tools.nsc.interpreter.IMain $ WrappedReunest.Ru637) на scala.tools.nsc.interpreter.IMain.interpret (IMain.scala: 569) на scala.tools.nsc.interpreter.IMain.interpret (IMain.scala: 565) на scala.tools.nsc.interpreter.ILoop.interpretStartingWith (ILoop.scala: 807) в scala.tools.nsc.interpreter.ILoop.command (ILoop.scala: 681) в scala.tools.nsc.interpreter.ILoop.processLine (ILoop.scala: 395) в scala.tools.nsc.interpreter.ILoop.loop (ILoop.scala: 415) в scala.tools.nsc.interpreter.ILoop $$ anonfun $ process $ 1.apply $ mcZ $ sp (ILoop.scala: 923) в scala.tools.nsc.interpreter.ILoop $$ anonfun $ process $ 1.apply (ILoop.scala: 909) в scala.tools.nsc.interpreter.ILoop $$ anonfun $ process $ 1.apply (ILoop.scala: 909) в scala.reflect.internal.util.ScalaClassLoader $ .savingContextLoader (ScalaClassLoader.scala: 97) в scala.tools.nsc.interpreter.ILoop.process (ILoop.scala: 909) в org.apache.spark.repl.Main $ .doMain (Main.scala: 74) в org.apache.spark.repl.Main $ .main (Main.scala:54) в org.apache.spark.repl.Main.main (Main.scala) в sun.reflect.NativeMethodAccessorImpl.invoke0 (родной метод) в sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.jre:lect).DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43) в java.lang.reflect.Method.invoke (Method.java:498) в org.apache.spark.deploy.SparkSubmit $ .org $ spache $ spark $$ runMain (SparkSubmit.scala: 775) в org.apache.spark.deploy.SparkSubmit $ .doRunMain $ 1 (SparkSubmit.scala: 180) в org.apache.spark.deploy.SparkSubmit $ .submit (SparkSubmit.scala: 205)в org.apache.spark.deploy.SparkSubmit $ .main (SparkSubmit.scala: 119) в org.apache.spark.deploy.SparkSubmit.main (SparkSubmit.scala) Причина: org.jets3t.service.S3ServiceException: ошибка запроса: Пустой ключ в org.jets3t.service.S3Service.getObject (S3Service.java:1470) в org.apache.hadoop.fs.s3.Jets3tFileSystemStore.get (Jets3tFileSystemStore.java:163)

Любой из аргументов, которые я передаю неправильно.Я могу получить доступ к пути s3, используя aws cli:

aws s3 ls s3: // bucketName / path

существует в S3.

Ответы [ 2 ]

0 голосов
/ 26 мая 2018

привет викас, глядя на пустой ключ вашего исключения, просто означает, что ваш ключ acesss и секретный ключ не связаны в сеансе carbon, потому что когда мы даем реализацию s3, мы пишем логику, что если какой-либо из ключей не предоставлен пользователем, то тогда он их значение должно быть взято как пустое

чтобы все было легко сначала создайте банку данных углерода, используя эту команду

Чистый пакет mvn -Pspark-2.1 затем выполните команду spark submit с этой командой

. / Spark-submit --jars file: ///home/anubhav/Downloads/softwares/spark-2.2.1-bin-hadoop2.7/carbonlib/apache-carbondata-1.4.0-SNAPSHOT-bin- spark2.2.1-hadoop2.7.2.jar --class org.apache.carbondata.examples.S3Example /home/anubhav/Documents/carbondata/carbondata/carbondata/examples/spark2/target/carbondata-examples-spark2-1.4.0- SNAPSHOT.jar local

замени мой путь к банке своим и посмотри, как он должен работать, у меня это работает

0 голосов
/ 17 мая 2018

Вы можете попробовать его, используя этот пример https://github.com/apache/carbondata/blob/master/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3Example.scala

Вы должны предоставить свойства учетных данных aws, чтобы спровоцировать сначала, после этого вы будете создавать carbonSession.

Если вы уже создали sparkContext безAWS свойства предоставляются.Тогда он не получит эти свойства даже после того, как вы передадите его в carbonContext.

...