Сообщить о пользовательских метриках искры в cloudwatch - PullRequest
0 голосов
/ 13 апреля 2020

Я использую EMR в AWS, и я хотел создать «CloudWatchSink», чтобы я мог выгружать пользовательские метрики в Cloudwatch.

Я успешно зарегистрировал настраиваемого репортера, а затем добавил «консольный приемник», и я вижу печать пользовательских метрик в канал stdout.

Когда я пытаюсь переключиться с MyConsoleSink на MyCloudwatchSink, исполнители не запускаются.

implicit val spark = SparkSession.builder
    .appName(getClass.getSimpleName.stripSuffix("$"))
//    .config("spark.metrics.conf.*.sink.console.class", "org.apache.spark.metrics.sink.ConsoleSink") // WORKS
//    .config("spark.metrics.conf.*.sink.console.class", "org.apache.spark.metrics.mystuff.MyConsoleSink") // WORKS
    .config("spark.metrics.conf.*.sink.console.class", "org.apache.spark.metrics.mystuff.MyCloudwatchSink") // NOPE
    .getOrCreate

Я понимаю, что регистрирую MyCloudwatchSink как sink.console.class, и это, вероятно, неправильно, но я не уверен, что мне следует делать вместо этого.

Я использую почти дословно копия репортера азагиотов

private[spark] class MyCloudwatchSink(
  val property: java.util.Properties,
  val registry: com.codahale.metrics.MetricRegistry,
  securityMgr: org.apache.spark.SecurityManager
) extends Sink {

  private lazy val amazonCloudWatchAsync = AmazonCloudWatchAsyncClientBuilder
    .standard()
    .withRegion(Regions.US_EAST_1)
    .build

  private lazy val cloudWatchReporter = {
    //TODO update registry prefix
    CloudWatchReporter.forRegistry(registry, amazonCloudWatchAsync, "spark-metrics-example")
      .convertRatesTo(TimeUnit.SECONDS)
      .convertDurationsTo(TimeUnit.MILLISECONDS)
      .filter(MetricFilter.ALL)
      .withPercentiles(Percentile.P75, Percentile.P99)
      .withOneMinuteMeanRate
      .withFiveMinuteMeanRate
      .withFifteenMinuteMeanRate
      .withMeanRate
      .withArithmeticMean
      .withStdDev
      .withStatisticSet
      .withZeroValuesSubmission
      .withReportRawCountValue
      .withHighResolution
      .withMeterUnitSentToCW(StandardUnit.Bytes)
      .withJvmMetrics
      .withGlobalDimensions("Region=us-west-2", "Instance=stage")
      //.withDryRun //TODO remove this when ready!
      .build
  }

  override def start(): Unit = cloudWatchReporter.start(30, TimeUnit.SECONDS)

  override def stop(): Unit = cloudWatchReporter.stop()

  override def report(): Unit = cloudWatchReporter.report()
}

Spark генерирует следующее исключение:

20/04/13 19:18:24 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Requesting driver to remove executor 1 for reason Container marked as failed: container_1586789645936_0006_01_000002 on host: ip-10-0-248-202.ec2.internal. Exit status: 1. Diagnostics: Exception from container-launch.
Container id: container_1586789645936_0006_01_000002
Exit code: 1
Stack trace: ExitCodeException exitCode=1: 
    at org.apache.hadoop.util.Shell.runCommand(Shell.java:972)
    at org.apache.hadoop.util.Shell.run(Shell.java:869)
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1170)
    at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:235)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:299)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:83)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Спасибо!

...