Спарк на EMR - com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter - PullRequest
0 голосов
/ 04 февраля 2020

Я использую Spark 2.4.4 на EMR 5.29.0. Я заметил, что это быстро обрабатывает данные и записывает, но так медленно, когда переименовывает файлы на S3, также я прочитал следующие журналы:

20/02/04 12:04:08 INFO ParquetFileFormat: Using user defined output committer for Parquet: com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter
20/02/04 12:04:08 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
20/02/04 12:04:08 INFO FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
20/02/04 12:04:08 INFO SQLHadoopMapReduceCommitProtocol: Using user defined output committer class com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter
20/02/04 12:04:08 INFO EmrOptimizedParquetOutputCommitter: EMR Optimized committer is not supported by this filesystem (org.apache.hadoop.hdfs.DistributedFileSystem)
20/02/04 12:04:08 INFO EmrOptimizedParquetOutputCommitter: Using output committer class org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
20/02/04 12:04:08 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
20/02/04 12:04:08 INFO FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
20/02/04 12:04:08 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
20/02/04 12:04:08 INFO FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
20/02/04 12:04:08 INFO SQLHadoopMapReduceCommitProtocol: Using output committer class com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter

Я ожидал использовать коммиттер EMROptimized, но это выглядит как будто он не совместим с spark. sql .sources.partitionOverwriteMode установлен в Dynami c, что является моим случаем. Поскольку он является откатом к универсальному коммиттеру c, я также добавил следующую опцию: spark.had oop .mapreduce.fileoutputcommitter.algorithm.version = 2 Получение некоторых улучшений, но все еще есть возможности для улучшения. Какой может быть лучший подход, учитывая вышесказанное?

...