Восстановление структурированной потоковой передачи Spark после исключения запроса - PullRequest
0 голосов
/ 07 мая 2020

Возможно ли автоматическое восстановление после исключения, созданного во время выполнения запроса?

Контекст: Я разрабатываю приложение Spark, которое считывает данные из Kafka topi c, обрабатывает данные и выводит их на S3. Однако после нескольких дней работы в производственной среде приложение Spark сталкивается с некоторыми сбоями в сети из S3, которые вызывают исключение и останавливают приложение. Также стоит отметить, что это приложение работает в Kubernetes с использованием GCP's Spark k8s Operator .

Из того, что я видел до сих пор, эти исключения незначительны, и простой перезапуск приложения решает проблему. вопрос. Можем ли мы обработать эти исключения и автоматически перезапустить структурированный потоковый запрос?

Вот пример выброшенного исключения:

    Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: Job aborted.
    === Streaming Query ===
    Identifier: ...
    Current Committed Offsets: ...
    Current Available Offsets: ...

    Current State: ACTIVE
    Thread State: RUNNABLE

    Logical Plan: ...

        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:297)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
    Caused by: org.apache.spark.SparkException: Job aborted.
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
        at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
        at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
        at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
        at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
        at io.blahblahView$$anonfun$11$$anonfun$apply$2.apply(View.scala:90)
        at io.blahblahView $$anonfun$11$$anonfun$apply$2.apply(View.scala:82)
        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
        at io.blahblahView$$anonfun$11.apply(View.scala:82)
        at io.blahblahView$$anonfun$11.apply(View.scala:79)
        at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$17.apply(MicroBatchExecution.scala:537)
        at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:535)
        at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:534)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
        at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
        at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
        ... 1 more
    Caused by: java.io.FileNotFoundException: No such file or directory: s3a://.../view/v1/_temporary/0
        at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:993)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.listStatus(S3AFileSystem.java:734)
        at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1517)
        at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1557)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:291)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:361)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:334)
        at org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
        at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:166)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:187)
        ... 47 more

Какой самый простой способ автоматически решать такие проблемы?

Ответы [ 2 ]

2 голосов
/ 08 мая 2020

Проведя слишком много часов, пытаясь найти элегантное решение этой проблемы, но ничего не обнаружив, вот что я придумал.

Кто-то может сказать, что это взлом, но он простой, работает и решает сложную проблему. Я тестировал его на производстве, и он решает проблему автоматического восстановления после сбоя из-за случайного незначительного исключения.

Я называю его Watchdog . Вот простейшая версия, в которой сторожевой таймер будет повторять выполнение запроса бесконечно:

val writer = df.writeStream...

while (true) {
   val query = writer.start()

   try {
        query.awaitTermination()
   } 
   catch {
       case e: StreamingQueryException => println("Streaming Query Exception caught!: " + e);
   }
}

Некоторые люди могут захотеть заменить while(true) каким-либо счетчиком, чтобы ограничить количество повторных попыток. Кто-то также может дополнить этот код и отправлять уведомления через Slack или по электронной почте всякий раз, когда происходит повторная попытка. Другие могут просто собрать количество повторных попыток в Prometheus.

Надеюсь, это поможет,

Ура

0 голосов
/ 08 мая 2020

Нет, надежного способа сделать это нет. Кстати, нет - тоже ответ.

  • Logi c для проверки исключений обычно выполняется через try / catch, запущенный на драйвере.

  • Поскольку непредвиденные ситуации на уровне исполнителя уже стандартно обрабатываются самой Spark Framework для структурированной потоковой передачи, и если ошибка не подлежит исправлению, тогда приложение / задание просто аварийно завершает работу после сигнализации ошибка (и) возвращается к драйверу, если вы не закодируете try / catch в различных конструкциях foreachXXX.

    • Тем не менее, для конструкций foreachXXX не ясно, что микропакет будет восстановлен при таком подходе. Afaics, некоторая часть микропакета с большой вероятностью потеряна. Однако сложно протестировать.
  • Учитывая, что в Spark есть стандартные вещи, к которым вы не можете подключиться, почему можно было вставить al oop или попробовать / поймать в источник программы? Точно так же широковещательные переменные являются проблемой - хотя, как говорят, у некоторых есть методы решения этой проблемы. Но это не в духе фреймворка.

Итак, хороший вопрос, поскольку мне интересно (ред) об этом (в прошлом).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...