Spark пишет файл, используя s3a в многопоточном режиме - PullRequest
0 голосов
/ 22 декабря 2019

Я использую локальную искру для чтения и записи с s3. Для обратной записи в s3 я использую параллельную утилиту java, чтобы я мог писать многопоточным способом.

Вот моя реализация этого

ConvertToCsv У этого метода есть искра. действие записи

 for ( String Id: listOfId) {

                Future<?> future = executor.submit( () -> {

                ConvertToCsv( dataFrame, destinationPath, Id);
            } );
            futures.add( future );

        } 

Я получаю эту ошибку!

Нет такого файла или каталога: s3a: // kira-bucket-parquet / collection / 82709dd1-8924-481c-9d93-14a9e2e0c524 / 5e67e9d5-2d8b-4c4b-928a-4736485af3ca / _tevent / 0 в org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus (S3AFileSystem.java:22.Sache.Sa.Fache.Sha.Sha.Sha.Sha.Sha.Sha.Sha.Sha.Sha.Sha.Sha.Sha.Sha.Sha.Sha.SAH.SAF.SHA.FSAH.SHA.FSAH.SHA.SHA.FSAH.SHA.SHA.SHA.FASH.FSAH.SHA.SHA.FSAH.SHA.SHA.SHA.FSAH.SHA.SFS.AFS.HAFSF), или файловой системе. .innerGetFileStatus (S3AFileSystem. ) в org.apache.hadoop.fs.s3a.S3AFileSystem.lambda $ listStatus $ 9 (S3AFileSystem.java:1882) в org.apache.hadoop.fs.s3a.Invoker.once (Invoker.java:109) в org.apache.hadoop.fs.s3a.S3AFileSystem.listStatus (S3AFileSystem.java:1882) в org.apache.hadoop.fs.FileSystem.listStatus (FileSystem.java:1919) в org.apache.hadoop.fs.FileSystem.listStatus (FileSystem.java:1961) в org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter. getAllCommittedTaskPaths (FileOutputCommitter.java:269) в org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob (FileOutputCommitter.java:309) в org.apache.spark.internal.ioProedHoopCoopMoop.MedSMM166) по адресу org.apache.spark.sql.execution.datasources.FileFormatWriter $ .write (FileFormatWriter.scala: 213) по адресу org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run (InsertIntoChad.Form)org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult $ lzycompute (commands.scala: 104) в org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult (commands.scala: 102) в org. apache.spark.sql.execution.command.DataWritingCommandExec.doExecute (commands.scala: 122) в илиg.apache.spark.sql.execution.SparkPlan $$ anonfun $ execute $ 1.apply (SparkPlan.scala: 131) в org.apache.spark.sql.execution.SparkPlan $$ anonfun $ execute $ 1.apply (SparkPlan.scala: 127) в org.apache.spark.sql.execution.SparkPlan $$ anonfun $ executeQuery $ 1.apply (SparkPlan.scala: 155) в org.apache.spark.rdd.RDDOperationScope $ .withScope (RDDOperationScope.scala: 151)в org.apache.spark.sql.execution.SparkPlan.executeQuery (SparkPlan.scala: 152) в org.apache.spark.sql.execution.SparkPlan.execute (SparkPlan.scala: 127) в org.apache.spark.sql.execution.QueryExecution.toRdd $ lzycompute (QueryExecution.scala

Решение, с которым я столкнулся, заключается в настройке коммитеров s3a.

Как настроить коммитеры S3a в локальной искре? и есть ли альтернативное решение?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...