Как получить локальную искру на AWS для записи на S3 - PullRequest
3 голосов
/ 22 октября 2019

Я установил Spark 2.4.3 с Hadoop 3.2 на экземпляр AWS EC2. Я использовал спарк (в основном pyspark) в локальном режиме с большим успехом. Приятно иметь возможность раскручивать что-то маленькое, а затем изменять его размер, когда мне нужно питание, и делать все это очень быстро. Когда мне действительно нужно масштабировать, я могу переключиться на EMR и пойти на обед. Все работает гладко, за исключением одной проблемы: я не могу заставить локальную искру надежно писать в S3 (я использую локальное пространство EBS). Это явно связано со всеми проблемами, описанными в документах об ограничениях S3 как файловой системы. Тем не менее, используя последний hadoop, я прочитал документы так: должен иметь возможность заставить его работать.

Обратите внимание, что я знаю об этом другом посте, который задает связанный вопрос;здесь есть некоторые указания, но я не вижу решения. Как использовать новый компоновщик волшебства паркета Hadoop для пользовательского сервера S3 с Spark

У меня есть следующие настройки (установленные в разных местах), следуя моему лучшему пониманию документации здесь: https://hadoop.apache.org/docs/r3.2.1/hadoop-aws/tools/hadoop-aws/index.html

fs.s3.impl: org.apache.hadoop.fs.s3a.S3AFileSystem  
fs.s3a.committer.name: directory   
fs.s3a.committer.magic.enabled: false  
fs.s3a.committer.threads: 8 
fs.s3a.committer.staging.tmp.path: /cache/staging  
fs.s3a.committer.staging.unique-filenames: true  
fs.s3a.committer.staging.conflict-mode: fail  
fs.s3a.committer.staging.abort.pending.uploads: true  
mapreduce.outputcommitter.factory.scheme.s3a: org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory  
fs.s3a.connection.maximum: 200  
fs.s3a.fast.upload: true  

Важным моментом является то, что я экономлю с использованием паркета. Я вижу, что ранее была проблема с сохранением паркета, но я не вижу этого упомянутого в последних документах. Может быть, это проблема?

В любом случае, вот ошибка, которую я получаю, что, по-видимому, свидетельствует о том типе ошибки, которую S3 выдает при попытке переименовать временную папку. Есть ли какой-то массив правильных настроек, которые уберут это?

java.io.IOException: Failed to rename S3AFileStatus{path=s3://my-research-lab-recognise/spark-testing/v2/nz/raw/bank/_temporary/0/_temporary/attempt_20190910022011_0004_m_000118_248/part-00118-c8f8259f-a727-4e19-8ee2-d6962020c819-c000.snappy.parquet; isDirectory=false; length=185052; replication=1; blocksize=33554432; modification_time=1568082036000; access_time=0; owner=brett; group=brett; permission=rw-rw-rw-; isSymlink=false; hasAcl=false; isEncrypted=false; isErasureCoded=false} isEmptyDirectory=FALSE to s3://my-research-lab-recognise/spark-testing/v2/nz/raw/bank/part-00118-c8f8259f-a727-4e19-8ee2-d6962020c819-c000.snappy.parquet
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:473)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:486)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:597)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:560)
        at org.apache.spark.mapred.SparkHadoopMapRedUtil$.performCommit$1(SparkHadoopMapRedUtil.scala:50)
        at org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:77)
        at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitTask(HadoopMapReduceCommitProtocol.scala:225)
        at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:78)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:247)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
        ... 10 more
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...