Может кто-нибудь помочь мне понять причину этой ошибки:
ERROR Query alert [id = d19f51b1-8131-40dd-ab62, runId = 276833a0-235f-4d2e-bd61] terminated with error
java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:347)
at scala.None$.get(Option.scala:345)
at org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker$.metrics(BasicWriteStatsTracker.scala:180)
at org.apache.spark.sql.execution.streaming.FileStreamSink.basicWriteJobStatsTracker(FileStreamSink.scala:103)
at org.apache.spark.sql.execution.streaming.FileStreamSink.addBatch(FileStreamSink.scala:140)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$17.apply(MicroBatchExecution.scala:568)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:111)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:240)
at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:97)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:170)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:566)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:251)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:61)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:565)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:207)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:175)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:175)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:251)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:61)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:175)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:169)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:296)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208)
Конфигурации кластера:
Время выполнения блоков данных 5.5 LTS
Scala 2.11
Spark 2.4.3
Драйвер: 64 ГБ памяти, 16 ядер, 3DBU
Работники: 64 ГБ памяти, 16 ядер, 3DBU (2-4 рабочих, автоматическое масштабирование)
есть 3 потоковых запроса, выполняющихся параллельно, как определено в fairscheduler. xml
Конфигурации Spark:
spark.sql.autoBroadcastJoinThreshold=-1
spark.sql.broadcastTimeout=1200
spark.executor.instances=4
spark.executor.cores=16
spark.executor.memory=29g
spark.sql.shuffle.partitions=32
spark.default.parallelism=32
spark.driver.maxResultSize=25g
spark.scheduler.mode=FAIR
spark.scheduler.allocation.file=/dbfs/config/fairscheduler.xml
Добавление потока кода ниже:
implicit class PipedObject[A](value: A) {
def conditionalPipe(f: A => A)(pred: Boolean): A =
if (pred) f(value) else value
}
implicit val spark: SparkSession = SparkSession
.builder()
.appName("MyApp")
.conditionalPipe(sess => sess.master("local[6]"))(false)
.getOrCreate()
import spark.implicits._
val cookedData = getCookedStreamingData() // streaming data as input from event hub
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "cook")
cookedData.writeStream
.option("checkpointLocation", "checkpointLocation1")
.queryName("queryName1")
.format("avro")
.option("path", "dir1")
.start()
val scoredData = score(cookedData)
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "score")
scoredData.writeStream
.option("checkpointLocation", "checkpointLocation2")
.queryName("queryName2")
.format("avro")
.option("path", "dir2")
.start()
val alertData = score(scoredData)
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "alert")
alertData.writeStream
.option("checkpointLocation", "checkpointLocation3")
.queryName("queryName3")
.format("avro")
.option("path", "dir3")
.start()
Образец fairScheduler. xml file:
<allocations>
<pool name="default">
<schedulingMode>FIFO</schedulingMode>
<weight>2</weight>
<minShare>2</minShare>
</pool>
<pool name="cook">
<schedulingMode>FAIR</schedulingMode>
<weight>1</weight>
<minShare>5</minShare>
</pool>
<pool name="score">
<schedulingMode>FAIR</schedulingMode>
<weight>1</weight>
<minShare>5</minShare>
</pool>
<pool name="alert">
<schedulingMode>FAIR</schedulingMode>
<weight>1</weight>
<minShare>5</minShare>
</pool>
</allocations>