Мой кластер Spark имеет следующую настройку:
Hadoop 3.1
Spark 2.4.4
Scala 2.12.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_212)
JARs:
hadoop-aws-3.1.0.jar
spark-hadoop-cloud_2.11-2.3.2.3.1.0.6-1.jar
Это соответствующие настройки Spark:
(spark.hadoop.fs.s3a.committer.staging.unique-filenames,true)
(spark.hadoop.fs.s3.impl,org.apache.hadoop.fs.s3a.S3AFileSystem)
(spark.hadoop.fs.s3a.committer.name,partitioned)
(spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version,10)
(spark.hadoop.fs.s3a.committer.staging.abort.pending.uploads,true)
(spark.hadoop.fs.s3a.server-side-encryption-algorithm,SSE-KMS)
(spark.sql.parquet.output.committer.class,org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter)
(spark.hadoop.fs.s3a.committer.magic.enabled,false)
(spark.hadoop.fs.s3a.committer.staging.conflict-mode,replace)
(spark.sql.sources.commitProtocolClass,org.apache.spark.internal.io.cloud.PathOutputCommitProtocol)
(spark.hadoop.fs.s3a.impl,org.apache.hadoop.fs.s3a.S3AFileSystem)
Нет ошибки разрешений S3, перейдя по этой ссылке: https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.1.5/bk_cloud-data-access/content/iam-role-permissions.html
s3:Get*
s3:Delete*
s3:Put*
s3:ListBucket
s3:ListBucketMultipartUploads
s3:AbortMultipartUpload
Это вывод моего _SUCCESS
файла:
{
"name" : "org.apache.hadoop.fs.s3a.commit.files.SuccessData/1",
"timestamp" : 1586969903889,
"date" : "Thu Apr 16 00:58:23 SGT 2020",
"hostname" : "<removed>",
"committer" : "partitioned",
"description" : "Task committer attempt_20200416005750_0000_m_000000_0",
"metrics" : {
"stream_write_block_uploads" : 0,
"files_created" : 0,
"S3guard_metadatastore_put_path_latencyNumOps" : 0,
"stream_write_block_uploads_aborted" : 0,
"committer_commits_reverted" : 0,
"op_open" : 0,
"stream_closed" : 0,
"committer_magic_files_created" : 0,
"object_copy_requests" : 0,
"s3guard_metadatastore_initialization" : 0,
"S3guard_metadatastore_put_path_latency90thPercentileLatency" : 0,
"stream_write_block_uploads_committed" : 0,
"S3guard_metadatastore_throttle_rate75thPercentileFrequency (Hz)" : 0,
"S3guard_metadatastore_throttle_rate90thPercentileFrequency (Hz)" : 0,
"committer_bytes_committed" : 0,
"op_create" : 0,
"stream_read_fully_operations" : 0,
"committer_commits_completed" : 0,
"object_put_requests_active" : 0,
"s3guard_metadatastore_retry" : 0,
"stream_write_block_uploads_active" : 0,
"stream_opened" : 0,
"S3guard_metadatastore_throttle_rate95thPercentileFrequency (Hz)" : 0,
"op_create_non_recursive" : 0,
"object_continue_list_requests" : 0,
"committer_jobs_completed" : 1,
"S3guard_metadatastore_put_path_latency50thPercentileLatency" : 0,
"stream_close_operations" : 0,
"stream_read_operations" : 0,
"object_delete_requests" : 0,
"fake_directories_deleted" : 0,
"stream_aborted" : 0,
"op_rename" : 0,
"object_multipart_aborted" : 0,
"committer_commits_created" : 0,
"op_get_file_status" : 5,
"s3guard_metadatastore_put_path_request" : 3,
"committer_commits_failed" : 0,
"stream_bytes_read_in_close" : 0,
"op_glob_status" : 0,
"stream_read_exceptions" : 0,
"op_exists" : 3,
"S3guard_metadatastore_throttle_rate50thPercentileFrequency (Hz)" : 0,
"S3guard_metadatastore_put_path_latency95thPercentileLatency" : 0,
"stream_write_block_uploads_pending" : 0,
"directories_created" : 0,
"S3guard_metadatastore_throttle_rateNumEvents" : 0,
"S3guard_metadatastore_put_path_latency99thPercentileLatency" : 0,
"stream_bytes_backwards_on_seek" : 0,
"stream_bytes_read" : 0,
"stream_write_total_data" : 0,
"committer_jobs_failed" : 0,
"stream_read_operations_incomplete" : 0,
"files_copied_bytes" : 0,
"op_delete" : 0,
"object_put_bytes_pending" : 0,
"stream_write_block_uploads_data_pending" : 0,
"op_list_located_status" : 0,
"object_list_requests" : 6,
"stream_forward_seek_operations" : 0,
"committer_tasks_completed" : 0,
"committer_commits_aborted" : 0,
"object_metadata_requests" : 10,
"object_put_requests_completed" : 0,
"stream_seek_operations" : 0,
"op_list_status" : 1,
"store_io_throttled" : 0,
"stream_write_failures" : 0,
"op_get_file_checksum" : 0,
"files_copied" : 0,
"ignored_errors" : 0,
"committer_bytes_uploaded" : 0,
"committer_tasks_failed" : 0,
"stream_bytes_skipped_on_seek" : 0,
"op_list_files" : 0,
"files_deleted" : 0,
"stream_bytes_discarded_in_abort" : 0,
"op_mkdirs" : 0,
"op_copy_from_local_file" : 0,
"op_is_directory" : 1,
"s3guard_metadatastore_throttled" : 0,
"S3guard_metadatastore_put_path_latency75thPercentileLatency" : 0,
"stream_write_total_time" : 0,
"stream_backward_seek_operations" : 0,
"object_put_requests" : 0,
"object_put_bytes" : 0,
"directories_deleted" : 0,
"op_is_file" : 0,
"S3guard_metadatastore_throttle_rate99thPercentileFrequency (Hz)" : 0
},
"diagnostics" : {
"fs.s3a.metadatastore.impl" : "org.apache.hadoop.fs.s3a.s3guard.NullMetadataStore",
"fs.s3a.committer.magic.enabled" : "false",
"fs.s3a.metadatastore.authoritative" : "false"
},
"filenames" : [ ]
}
Чтобы проверить запись, я просто делаю следующее:
scala> val df = spark.read.parquet(<origin>) // have confirmed that the df is not empty
scala> df.write.format("parquet").save(<destination>) // have confirmed when FileCommitter is used, output is written
Возможные проблемы:
1) Мой кластер Spark размещен на Nomad без установки HDFS. Я подтвердил, что данные записываются локально на диск на рабочих Spark в каталоге /tmp/staging
, но не записываются в s3.
2) ???