Я играл с записью выходных данных задания Spark Structured Streaming на S3:
case class Foo(month: Int, day: Int, value: Double)
val rand = new scala.util.Random()
def runSpark(s: SparkSession, endpoint: String, bucket: String): Unit = {
s.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "***")
s.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "***")
s.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", endpoint)
val rateStream = s.readStream
.format("rate")
.option("rowsPerSecond", "5")
.option("numPartitions", "1")
.load()
import s.implicits._
val fooStream: Dataset[Foo] = rateStream
.select(col("timestamp"))
.as[Timestamp]
.map { ts =>
val lt = ts.toLocalDateTime
Foo(lt.getMonthValue, lt.getDayOfMonth, rand.nextDouble)
}
val query = fooStream.writeStream
.partitionBy("month", "day")
.format("avro")
.outputMode(OutputMode.Append)
.option("path", s"s3a://$bucket/avro-input/")
.option("checkpointLocation", "/tmp/checkpoint/")
.start()
query.awaitTermination()
}
Моя настройка:
- Spark - 2.4.0
- Hadoop - 2.7.0
- AWS SDK - 1.7.4
Параметры конфигурации Spark:
spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2
Когда я пытаюсь записать в us-east-1
(Н. Вирджиния) Я не вижу никаких проблем, работа продолжается, пока я не завершил ее вручную.С другой стороны, когда я пытаюсь написать в us-east-2
(штат Огайо), я продолжаю получать следующую ошибку после небольшой работы в течение небольшого промежутка времени:
[info] o.a.s.s.e.d.FileFormatWriter - Aborting job f244e98b-672b-4b5a-8b63-a0c1257c3236.
[info] com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 403, AWS Service: Amazon S3, AWS Request ID: BBC46B07C469B019, AWS Error Code: SignatureDoesNotMatch, AWS Error Message: The request signature we calculated does not match the signature you provided. Check your key and signing method.
[info] at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
[info] at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
[info] at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
[info] at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
[info] at com.amazonaws.services.s3.AmazonS3Client.copyObject(AmazonS3Client.java:1507)
[info] at com.amazonaws.services.s3.transfer.internal.CopyCallable.copyInOneChunk(CopyCallable.java:143)
[info] at com.amazonaws.services.s3.transfer.internal.CopyCallable.call(CopyCallable.java:131)
[info] at com.amazonaws.services.s3.transfer.internal.CopyMonitor.copy(CopyMonitor.java:189)
[info] at com.amazonaws.services.s3.transfer.internal.CopyMonitor.call(CopyMonitor.java:134)
[info] at com.amazonaws.services.s3.transfer.internal.CopyMonitor.call(CopyMonitor.java:46)
[info] at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[info] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[info] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[info] at java.lang.Thread.run(Thread.java:748)
Я знаю, что регион Огайоподдерживает только Signature Version 4 и AWS SDK 1.7.4
по умолчанию V2, поэтому при попытке записи туда я должен добавить следующую строку:
System.setProperty(SDKGlobalConfiguration.ENABLE_S3_SIGV4_SYSTEM_PROPERTY, "true")
Интересно, что я мог написать в us-east-1
(NВирджиния), которая поддерживает обе версии без каких-либо проблем с включенным V4 ...