Я использую локальную искру для чтения и записи с s3. Для обратной записи в s3 я использую параллельную утилиту java, чтобы я мог писать многопоточным способом.
Вот моя реализация этого
ConvertToCsv У этого метода есть искра. действие записи
for ( String Id: listOfId) {
Future<?> future = executor.submit( () -> {
ConvertToCsv( dataFrame, destinationPath, Id);
} );
futures.add( future );
}
Я получаю эту ошибку!
Нет такого файла или каталога: s3a: // kira-bucket-parquet / collection / 82709dd1-8924-481c-9d93-14a9e2e0c524 / 5e67e9d5-2d8b-4c4b-928a-4736485af3ca / _tevent / 0 в org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus (S3AFileSystem.java:22.Sache.Sa.Fache.Sha.Sha.Sha.Sha.Sha.Sha.Sha.Sha.Sha.Sha.Sha.Sha.Sha.Sha.Sha.SAH.SAF.SHA.FSAH.SHA.FSAH.SHA.SHA.FSAH.SHA.SHA.SHA.FASH.FSAH.SHA.SHA.FSAH.SHA.SHA.SHA.FSAH.SHA.SFS.AFS.HAFSF), или файловой системе. .innerGetFileStatus (S3AFileSystem. ) в org.apache.hadoop.fs.s3a.S3AFileSystem.lambda $ listStatus $ 9 (S3AFileSystem.java:1882) в org.apache.hadoop.fs.s3a.Invoker.once (Invoker.java:109) в org.apache.hadoop.fs.s3a.S3AFileSystem.listStatus (S3AFileSystem.java:1882) в org.apache.hadoop.fs.FileSystem.listStatus (FileSystem.java:1919) в org.apache.hadoop.fs.FileSystem.listStatus (FileSystem.java:1961) в org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter. getAllCommittedTaskPaths (FileOutputCommitter.java:269) в org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob (FileOutputCommitter.java:309) в org.apache.spark.internal.ioProedHoopCoopMoop.MedSMM166) по адресу org.apache.spark.sql.execution.datasources.FileFormatWriter $ .write (FileFormatWriter.scala: 213) по адресу org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run (InsertIntoChad.Form)org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult $ lzycompute (commands.scala: 104) в org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult (commands.scala: 102) в org. apache.spark.sql.execution.command.DataWritingCommandExec.doExecute (commands.scala: 122) в илиg.apache.spark.sql.execution.SparkPlan $$ anonfun $ execute $ 1.apply (SparkPlan.scala: 131) в org.apache.spark.sql.execution.SparkPlan $$ anonfun $ execute $ 1.apply (SparkPlan.scala: 127) в org.apache.spark.sql.execution.SparkPlan $$ anonfun $ executeQuery $ 1.apply (SparkPlan.scala: 155) в org.apache.spark.rdd.RDDOperationScope $ .withScope (RDDOperationScope.scala: 151)в org.apache.spark.sql.execution.SparkPlan.executeQuery (SparkPlan.scala: 152) в org.apache.spark.sql.execution.SparkPlan.execute (SparkPlan.scala: 127) в org.apache.spark.sql.execution.QueryExecution.toRdd $ lzycompute (QueryExecution.scala
Решение, с которым я столкнулся, заключается в настройке коммитеров s3a.
Как настроить коммитеры S3a в локальной искре? и есть ли альтернативное решение?