NoFileException при записи данных из структурированного потока в контрольную точку S3 - PullRequest
0 голосов
/ 20 апреля 2020

Из кода Spark я пытаюсь записать данные в S3.

почему следует выдать исключение и завершить процесс во время записи, сказав FileNotFound

ctField(event_time,StringType,true), StructField(user_uuid,StringType,true), StructField(status,StringType,true)),true), StructField(version_event,StructType(StructField(oem_model
_version,StringType,true), StructField(XXX_model_version,StringType,true), StructField(XXX_model,StringType,true), StructField(dsn,StringType,true), StructField(oem_id,StringTyp
e,true)),true), cast(value#8 as string), Some(UTC)) AS event#21]
                   +- StreamingExecutionRelation KafkaV2[Subscribe[DataPipelineUSField]], [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]

      at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:297)
      at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
Caused by: java.io.FileNotFoundException: No such file or directory: s3a://XXX/checkpointLocation/commits/.11.650308d9-14d8-4bc4-a41d-bdc92b73f745.tmp
      at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1642)
      at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:117)
      at org.apache.hadoop.fs.FileSystem.getFileLinkStatus(FileSystem.java:2318)
      at org.apache.hadoop.fs.FileSystem.rename(FileSystem.java:1283)
      at org.apache.hadoop.fs.DelegateToFileSystem.renameInternal(DelegateToFileSystem.java:204)
      at org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:761)
      at org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:691)
      at org.apache.hadoop.fs.FileContext.rename(FileContext.java:966)
      at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:331)
      at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147)
      at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.org$apache$spark$sql$execution$streaming$HDFSMetadataLog$$writeBatchToFile(HDFSMetadataLog.scala:126)
      at org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply$mcZ$sp(HDFSMetadataLog.scala:112)
      at org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply(HDFSMetadataLog.scala:110)
      at org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply(HDFSMetadataLog.scala:110)
      at scala.Option.getOrElse(Option.scala:121)
      at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:110)
      at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply$mcV$sp(MicroBatchExecution.scala:547)
      at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:545)
      at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:545)
      at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:557)
      at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:545)
      at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198)
      at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
      at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
      at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
      at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
      at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
      at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
      at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
      at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
      ... 1 more
***************** ERROR: DataLakeIngestion Query [id = efcac653-ead6-4a18-90b3-6c79dcb39cb3, runId = b45d6cc0-d110-44ce-8a17-90d59892207c] terminated with exception: No such file or directory: s3a://XXX/checkpointLocation/commits/.11.650308d9-14d8-4bc4-a41d-bdc92b73f745.tmp
20/04/20 19:02:31 INFO ApplicationMaster: Final app status: SUCCEEDED, exitCode: 0
20/04/20 19:02:31 INFO SparkContext: Invoking stop() from shutdown hook
20/04/20 19:02:31 INFO SparkUI: Stopped Spark web UI at http://XXX.ec2.internal:37981
20/04/20 19:02:31 INFO YarnAllocator: Driver requested a total number of 0 executor(s).

Вот код

Code:

1. Get Spark Context

SparkSession sparkSession = builder.getOrCreate();
session = sparkSession;
sparkContext = sparkSession.sparkContext();
sparkContext.hadoopConfiguration()
.set("fs.defaultFS", defaultFs);
sparkContext.hadoopConfiguration()
.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem");
sparkContext.hadoopConfiguration()
.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem");
sparkContext.hadoopConfiguration()
.set("fs.s3n.awsAccessKeyId", s3AwsAccessKeyId);
sparkContext.hadoopConfiguration()
.set("fs.s3n.awsSecretAccessKey", s3AwsSectetAccessKey);

2. Read input stream
Dataset<Row> inputDf = sparkSession.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", kafkaBootstrapServers)
.option("kafka.request.timeout.ms", 120000)
.option("kafkaConsumer.pollTimeoutMs", 120000)
.option("kafka.default.api.timeout.ms", 300000)
.option("kafka.fetch.max.wait.ms", 5000)
.option("kafka.max.poll.records", 125)
.option("maxOffsetsPerTrigger", maxOffsetsPerTrigger)
.option("subscribe", kafkaTopic)
.option("startingOffsets", kafkaStartingOffsets)
.option("failOnDataLoss", false)
.option("enable.auto.commit", kafkaEnableAutocommit)
.load();
3. Get dataset
Dataset<Row> dataDf = inputDf.select(from_json(...
);

4. Create SQLContext
SQLContext sqlContext = new SQLContext(sparkSession);

5. Populate dataset   
Dataset<Row> df = sqlContext
.sql("select * from table");

6. Write output
StreamingQuery query = df.writeStream()

.outputMode("append")
.option("compression", "snappy")
.format("parquet")
.option("checkpointLocation", checkpointLocation)
.option("path", outputPath)
.partitionBy("date", "time")
.trigger(Trigger.ProcessingTime(triggerProcessingTime))
.start();

query.awaitTermination();

...