Spark Structured Streaming не может записать данные в регионы, которые поддерживают только Signature Version 4 - PullRequest
0 голосов
/ 13 февраля 2019

Я играл с записью выходных данных задания Spark Structured Streaming на S3:

case class Foo(month: Int, day: Int, value: Double)

val rand = new scala.util.Random()

def runSpark(s: SparkSession, endpoint: String, bucket: String): Unit = {
  s.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "***")
  s.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "***")
  s.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", endpoint)

  val rateStream = s.readStream
    .format("rate")
    .option("rowsPerSecond", "5")
    .option("numPartitions", "1")
    .load()

  import s.implicits._

  val fooStream: Dataset[Foo] = rateStream
    .select(col("timestamp"))
    .as[Timestamp]
    .map { ts =>
      val lt = ts.toLocalDateTime
      Foo(lt.getMonthValue, lt.getDayOfMonth, rand.nextDouble)
    }

  val query = fooStream.writeStream
    .partitionBy("month", "day")
    .format("avro")
    .outputMode(OutputMode.Append)
    .option("path", s"s3a://$bucket/avro-input/")
    .option("checkpointLocation", "/tmp/checkpoint/")
    .start()

  query.awaitTermination()
}

Моя настройка:

  • Spark - 2.4.0
  • Hadoop - 2.7.0
  • AWS SDK - 1.7.4

Параметры конфигурации Spark:

spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2

Когда я пытаюсь записать в us-east-1(Н. Вирджиния) Я не вижу никаких проблем, работа продолжается, пока я не завершил ее вручную.С другой стороны, когда я пытаюсь написать в us-east-2 (штат Огайо), я продолжаю получать следующую ошибку после небольшой работы в течение небольшого промежутка времени:

[info] o.a.s.s.e.d.FileFormatWriter - Aborting job f244e98b-672b-4b5a-8b63-a0c1257c3236.
[info] com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 403, AWS Service: Amazon S3, AWS Request ID: BBC46B07C469B019, AWS Error Code: SignatureDoesNotMatch, AWS Error Message: The request signature we calculated does not match the signature you provided. Check your key and signing method.
[info]  at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
[info]  at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
[info]  at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
[info]  at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
[info]  at com.amazonaws.services.s3.AmazonS3Client.copyObject(AmazonS3Client.java:1507)
[info]  at com.amazonaws.services.s3.transfer.internal.CopyCallable.copyInOneChunk(CopyCallable.java:143)
[info]  at com.amazonaws.services.s3.transfer.internal.CopyCallable.call(CopyCallable.java:131)
[info]  at com.amazonaws.services.s3.transfer.internal.CopyMonitor.copy(CopyMonitor.java:189)
[info]  at com.amazonaws.services.s3.transfer.internal.CopyMonitor.call(CopyMonitor.java:134)
[info]  at com.amazonaws.services.s3.transfer.internal.CopyMonitor.call(CopyMonitor.java:46)
[info]  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[info]  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[info]  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[info]  at java.lang.Thread.run(Thread.java:748)

Я знаю, что регион Огайоподдерживает только Signature Version 4 и AWS SDK 1.7.4 по умолчанию V2, поэтому при попытке записи туда я должен добавить следующую строку:

System.setProperty(SDKGlobalConfiguration.ENABLE_S3_SIGV4_SYSTEM_PROPERTY, "true")

Интересно, что я мог написать в us-east-1 (NВирджиния), которая поддерживает обе версии без каких-либо проблем с включенным V4 ...

...