Spark настраиваемый источник метри c, не инициализируется на исполнителях в режиме YARN - PullRequest
0 голосов
/ 15 апреля 2020

У меня проблема с тем, что я не могу получить свой пользовательский источник для исполнителей в режиме YARN. В локальном режиме все работает. Вот фрагмент кода:

  val NAME = "eraser"

  private lazy val source = {
    logInfo(s"Check Registered Metric system on ${SparkEnv.get.executorId}" +
      s" ${SparkEnv.get.metricsSystem.getSourcesByName("jvm").mkString(",")}")
    logInfo(s"Check Registered Metric system on ${SparkEnv.get.executorId} " +
      s"${SparkEnv.get.metricsSystem.getSourcesByName(NAME).mkString(",")}")
    SparkEnv.get.metricsSystem
      .getSourcesByName("jvm")
      .head
  }
}

Метрики JVM существуют, но мой клиентский источник - нет. Вот как я его инициализирую.

.config(s"spark.metrics.conf.*.source.${MetricsHelper.NAME}.class",
        "org.apache.spark.metrics.source.DatalakeEraserSource")
      .config("spark.metrics.conf.*.source.jvm.class", "org.apache.spark.metrics.source.JvmSource")

// or SparkEnv.get.metricsSystem.registerSource(new DatalakeEraserSource())

Если я попытаюсь использовать мой источник на исполнителях, я получу пустой итератор SparkEnv.get.metricsSystem.getSourcesByName("jvm")

Вот здесь трассировка стека.

ob aborted due to stage failure: Task 10 in stage 3.0 failed 4 times, most recent failure: Lost task 10.3 in stage 3.0 (TID 14, spotcluster-29057-198-prod.eu1.appsflyer.com, executor 1): java.util.NoSuchElementException: next on empty iterator
at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:63)
at scala.collection.IterableLike$class.head(IterableLike.scala:107)
at scala.collection.mutable.ArrayBuffer.scala$collection$IndexedSeqOptimized$$super$head(ArrayBuffer.scala:48)
at scala.collection.IndexedSeqOptimized$class.head(IndexedSeqOptimized.scala:126)
at scala.collection.mutable.ArrayBuffer.head(ArrayBuffer.scala:48)
at com.appsflyer.datalake.s3.eraser.MetricsHelper$.source$lzycompute(MetricsHelper.scala:17)
at com.appsflyer.datalake.s3.eraser.MetricsHelper$.source(MetricsHelper.scala:10)
at com.appsflyer.datalake.s3.eraser.MetricsHelper$.countDeleteRequestPerFile(MetricsHelper.scala:21)
at org.datalake.s3.eraser.Eraser$$anonfun$4.apply(Eraser.scala:118)
at org.datalake.s3.eraser.Eraser$$anonfun$4.apply(Eraser.scala:116)
at org.apache.spark.sql.KeyValueGroupedDataset$$anonfun$1.apply(KeyValueGroupedDataset.scala:196)
at org.apache.spark.sql.KeyValueGroupedDataset$$anonfun$1.apply(KeyValueGroupedDataset.scala:196)
at org.apache.spark.sql.execution.MapGroupsExec$$anonfun$10$$anonfun$apply$4.apply(objects.scala:337)
at org.apache.spark.sql.execution.MapGroupsExec$$anonfun$10$$anonfun$apply$4.apply(objects.scala:336)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage5.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
...

Как я могу избежать этого? Мне нужно контролировать свои показатели отдельно от JVM. Я не могу использовать MetricRegistry из jvm источника.

...