Ниже приведен фрагмент кода, который я пытаюсь использовать для создания таблицы карбондата в S3. Однако, несмотря на установку учетных данных aws в hadoopconfiguration, он по-прежнему жалуется на то, что секретный ключ и ключ доступа не установлены. В чем здесь проблема?
import org.apache.spark.sql.CarbonSession._
import org.apache.spark.sql.CarbonSession._
val carbon = SparkSession.builder().config(sc.getConf).getOrCreateCarbonSession("s3n://url")
carbon.sparkContext.hadoopConfiguration.set("fs.s3n.awsAccessKeyId","<accesskey>")
carbon.sparkContext.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey","<secretaccesskey>")
carbon.sql("CREATE TABLE IF NOT EXISTS test_table(id string,name string,city string,age Int) STORED BY 'carbondata'")
Последняя команда выдает ошибку:
java.lang.IllegalArgumentException: идентификатор ключа доступа AWS и секрет
Ключ доступа должен быть указан как имя пользователя или пароль
(соответственно) s3n-URL или установкой fs.s3n.awsAccessKeyId
или свойства fs.s3n.awsSecretAccessKey (соответственно)
Spark Version : 2.2.1
Command used to start spark-shell:
$SPARK_PATH/bin/spark-shell --jars /localpath/jar/apache-carbondata-1.3.1-bin-spark2.2.1-hadoop2.7.2/apache-carbondata-1.3.1-bin-spark2.2.1-hadoop2.7.2.jar,/localpath/jar/spark-avro_2.11-4.0.0.jar --packages com.amazonaws:aws-java-sdk-pom:1.9.22,org.apache.hadoop:hadoop-aws:2.7.2,org.slf4j:slf4j-simple:1.7.21,asm:asm:3.2,org.xerial.snappy:snappy-java:1.1.7.1,com.databricks:spark-avro_2.11:4.0.0
UPDATE:
Обнаружено, что поддержка S3 доступна только в 1.4.0 RC1. Поэтому я собрал RC1 и проверил приведенный ниже код на то же самое. Но все же я, кажется, сталкиваюсь с проблемами. Любая помощь приветствуется.
Код:
import org.apache.spark.sql.CarbonSession._
import org.apache.hadoop.fs.s3a.Constants.{ACCESS_KEY, ENDPOINT, SECRET_KEY}
import org.apache.spark.sql.SparkSession
import org.apache.carbondata.core.constants.CarbonCommonConstants
object sample4 {
def main(args: Array[String]) {
val (accessKey, secretKey, endpoint) = getKeyOnPrefix("s3n://")
//val rootPath = new File(this.getClass.getResource("/").getPath
// + "../../../..").getCanonicalPath
val path = "/localpath/sample/data1.csv"
val spark = SparkSession
.builder()
.master("local")
.appName("S3UsingSDKExample")
.config("spark.driver.host", "localhost")
.config(accessKey, "<accesskey>")
.config(secretKey, "<secretkey>")
//.config(endpoint, "s3-us-east-1.amazonaws.com")
.getOrCreateCarbonSession()
spark.sql("Drop table if exists carbon_table")
spark.sql(
s"""
| CREATE TABLE if not exists carbon_table(
| shortField SHORT,
| intField INT,
| bigintField LONG,
| doubleField DOUBLE,
| stringField STRING,
| timestampField TIMESTAMP,
| decimalField DECIMAL(18,2),
| dateField DATE,
| charField CHAR(5),
| floatField FLOAT
| )
| STORED BY 'carbondata'
| LOCATION 's3n://bucketName/table/carbon_table'
| TBLPROPERTIES('SORT_COLUMNS'='', 'DICTIONARY_INCLUDE'='dateField, charField')
""".stripMargin)
}
def getKeyOnPrefix(path: String): (String, String, String) = {
val endPoint = "spark.hadoop." + ENDPOINT
if (path.startsWith(CarbonCommonConstants.S3A_PREFIX)) {
("spark.hadoop." + ACCESS_KEY, "spark.hadoop." + SECRET_KEY, endPoint)
} else if (path.startsWith(CarbonCommonConstants.S3N_PREFIX)) {
("spark.hadoop." + CarbonCommonConstants.S3N_ACCESS_KEY,
"spark.hadoop." + CarbonCommonConstants.S3N_SECRET_KEY, endPoint)
} else if (path.startsWith(CarbonCommonConstants.S3_PREFIX)) {
("spark.hadoop." + CarbonCommonConstants.S3_ACCESS_KEY,
"spark.hadoop." + CarbonCommonConstants.S3_SECRET_KEY, endPoint)
} else {
throw new Exception("Incorrect Store Path")
}
}
def getSparkMaster(args: Array[String]): String = {
if (args.length == 6) args(5)
else if (args(3).contains("spark:") || args(3).contains("mesos:")) args(3)
else "local"
}
}
Ошибка:
18/05/17 12:23:22 ERROR SegmentStatusManager: main Failed to read metadata of load
org.apache.hadoop.fs.s3.S3Exception: org.jets3t.service.ServiceException: Request Error: Empty key
Я также пробовал использовать пример кода в (пробовал протоколы s3, s3n, s3a):
https://github.com/apache/carbondata/blob/master/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3Example.scala
Ран как:
S3Example.main (Array ( "Accesskey", "SecretKey", "s3: // bucketName / путь / carbon_table", "https://bucketName.s3.amazonaws.com","local"))
Ошибка трассировки стека:
org.apache.hadoop.fs.s3.S3Exception:
org.jets3t.service.S3ServiceException: ошибка запроса: пустой ключ в
org.apache.hadoop.fs.s3.Jets3tFileSystemStore.get (Jets3tFileSystemStore.java:175)
в
org.apache.hadoop.fs.s3.Jets3tFileSystemStore.retrieveINode (Jets3tFileSystemStore.java:221)
at sun.reflect.GeneratedMethodAccessor42.invoke (неизвестный источник) в
sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
в java.lang.reflect.Method.invoke (Method.java:498) в
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod (RetryInvocationHandler.java:191)
в
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke (RetryInvocationHandler.java:102)
в com.sun.proxy. $ Proxy21.retrieveINode (неизвестный источник) в
org.apache.hadoop.fs.s3.S3FileSystem.getFileStatus (S3FileSystem.java:340)
в org.apache.hadoop.fs.FileSystem.exists (FileSystem.java:1426) в
org.apache.carbondata.core.datastore.filesystem.AbstractDFSCarbonFile.isFileExist (AbstractDFSCarbonFile.java:426)
в
org.apache.carbondata.core.datastore.impl.FileFactory.isFileExist (FileFactory.java:201)
в
org.apache.carbondata.core.statusmanager.SegmentStatusManager.readTableStatusFile (SegmentStatusManager.java:246)
в
org.apache.carbondata.core.statusmanager.SegmentStatusManager.readLoadMetadata (SegmentStatusManager.java:197)
в
org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree.clearBTreeAndDictionaryLRUCache (ManageDictionaryAndBTree.java:101)
в
org.apache.spark.sql.hive.CarbonFileMetastore.dropTable (CarbonFileMetastore.scala: 460)
в
org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand.processMetadata (CarbonCreateTableCommand.scala: 148)
в
org.apache.spark.sql.execution.command.MetadataCommand.run (package.scala: 68)
в
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult $ lzycompute (commands.scala: 58)
в
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult (commands.scala: 56)
в
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect (commands.scala: 67)
в org.apache.spark.sql.Dataset. (Dataset.scala: 183) в
org.apache.spark.sql.CarbonSession $$ anonfun $ SQL $ 1.Apply (CarbonSession.scala: 107)
в
org.apache.spark.sql.CarbonSession $$ anonfun $ SQL $ 1.Apply (CarbonSession.scala: 96)
в
org.apache.spark.sql.CarbonSession.withProfiler (CarbonSession.scala: 144)
в org.apache.spark.sql.CarbonSession.sql (CarbonSession.scala: 94) в
$ line19. $ read $$ iw $$ iw $$ iw $$ iw $$ iw $$ iw $$ iw $$ iw $ S3Example $ .main (: 68) в $ line26. $ read $$ iw $$ iw $$ IW $$ $$ IW IW IW $$ $$ $$ IW IW. (: 31)
в $ line26. $ читать $$ iw $$ iw $$ iw $$ iw $$ iw $$ iw $$ iw. (: 36) в
$ line26. $ read $$ iw $$ iw $$ iw $$ iw $$ iw $$ iw. (: 38) в$ line26. $ read $$ iw $$ iw $$ iw $$ iw $$ iw. (: 40) в $ line26. $ read $$ iw $$ iw $$ iw $$ iw. (: 42) в $line26. $ read $$ iw $$ iw $$ iw. (: 44) в $ line26. $ read $$ iw $$ iw. (: 46) в $ line26. $ read $$ iw. (: 48) в$ line26. $ read. (: 50) в $ line26. $ read $. (: 54) в $ line26. $ read $. () в $ line26. $ eval $. $ print $ lzycompute (: 7) в $line26. $ eval $. $ print (: 6) в $ line26. $ eval. $ print () в sun.reflect.NativeMethodAccessorImpl.invoke0 (собственный метод) в sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:)в sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43) в java.lang.reflect.Method.invoke (Method.java:498) в scala.tools.nsc.interpreter.IMain $ ReadEvalPrintcall (: 786) на scala.tools.nsc.interpreter.IMain $ Request.loadAndRun (IMain.scala: 1047) на scala.tools.nsc.interpreter.IMain $ WrappedRequest $$ anonfun $ loadAndRunReq $ 1.apply (IMain.scala: 638) в scala.tools.nsc.interpreter.IMain $ WrappedRequest $$ anonfun $ loadAndRunReq $ 1.apply (IMain.scala: 637) в scala.reflect.internal.util.ScalaClassLoader $ class.asContext (ScalaClassLoader.scala: 31) в scala.reflect.internal.util.AbstractFileClassLoader.asContext (AbstractFileClassLoader.scala: 19) в scala.tools.nsc.interpreter.IMain $ WrappedReunest.Ru637) на scala.tools.nsc.interpreter.IMain.interpret (IMain.scala: 569) на scala.tools.nsc.interpreter.IMain.interpret (IMain.scala: 565) на scala.tools.nsc.interpreter.ILoop.interpretStartingWith (ILoop.scala: 807) в scala.tools.nsc.interpreter.ILoop.command (ILoop.scala: 681) в scala.tools.nsc.interpreter.ILoop.processLine (ILoop.scala: 395) в scala.tools.nsc.interpreter.ILoop.loop (ILoop.scala: 415) в scala.tools.nsc.interpreter.ILoop $$ anonfun $ process $ 1.apply $ mcZ $ sp (ILoop.scala: 923) в scala.tools.nsc.interpreter.ILoop $$ anonfun $ process $ 1.apply (ILoop.scala: 909) в scala.tools.nsc.interpreter.ILoop $$ anonfun $ process $ 1.apply (ILoop.scala: 909) в scala.reflect.internal.util.ScalaClassLoader $ .savingContextLoader (ScalaClassLoader.scala: 97) в scala.tools.nsc.interpreter.ILoop.process (ILoop.scala: 909) в org.apache.spark.repl.Main $ .doMain (Main.scala: 74) в org.apache.spark.repl.Main $ .main (Main.scala:54) в org.apache.spark.repl.Main.main (Main.scala) в sun.reflect.NativeMethodAccessorImpl.invoke0 (родной метод) в sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.jre:lect).DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43) в java.lang.reflect.Method.invoke (Method.java:498) в org.apache.spark.deploy.SparkSubmit $ .org $ spache $ spark $$ runMain (SparkSubmit.scala: 775) в org.apache.spark.deploy.SparkSubmit $ .doRunMain $ 1 (SparkSubmit.scala: 180) в org.apache.spark.deploy.SparkSubmit $ .submit (SparkSubmit.scala: 205)в org.apache.spark.deploy.SparkSubmit $ .main (SparkSubmit.scala: 119) в org.apache.spark.deploy.SparkSubmit.main (SparkSubmit.scala) Причина: org.jets3t.service.S3ServiceException: ошибка запроса: Пустой ключ в org.jets3t.service.S3Service.getObject (S3Service.java:1470) в org.apache.hadoop.fs.s3.Jets3tFileSystemStore.get (Jets3tFileSystemStore.java:163)
Любой из аргументов, которые я передаю неправильно.Я могу получить доступ к пути s3, используя aws cli:
aws s3 ls s3: // bucketName / path
существует в S3.