Я использую EMR в AWS, и я хотел создать «CloudWatchSink», чтобы я мог выгружать пользовательские метрики в Cloudwatch.
Я успешно зарегистрировал настраиваемого репортера, а затем добавил «консольный приемник», и я вижу печать пользовательских метрик в канал stdout.
Когда я пытаюсь переключиться с MyConsoleSink
на MyCloudwatchSink
, исполнители не запускаются.
implicit val spark = SparkSession.builder
.appName(getClass.getSimpleName.stripSuffix("$"))
// .config("spark.metrics.conf.*.sink.console.class", "org.apache.spark.metrics.sink.ConsoleSink") // WORKS
// .config("spark.metrics.conf.*.sink.console.class", "org.apache.spark.metrics.mystuff.MyConsoleSink") // WORKS
.config("spark.metrics.conf.*.sink.console.class", "org.apache.spark.metrics.mystuff.MyCloudwatchSink") // NOPE
.getOrCreate
Я понимаю, что регистрирую MyCloudwatchSink
как sink.console.class
, и это, вероятно, неправильно, но я не уверен, что мне следует делать вместо этого.
Я использую почти дословно копия репортера азагиотов
private[spark] class MyCloudwatchSink(
val property: java.util.Properties,
val registry: com.codahale.metrics.MetricRegistry,
securityMgr: org.apache.spark.SecurityManager
) extends Sink {
private lazy val amazonCloudWatchAsync = AmazonCloudWatchAsyncClientBuilder
.standard()
.withRegion(Regions.US_EAST_1)
.build
private lazy val cloudWatchReporter = {
//TODO update registry prefix
CloudWatchReporter.forRegistry(registry, amazonCloudWatchAsync, "spark-metrics-example")
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.filter(MetricFilter.ALL)
.withPercentiles(Percentile.P75, Percentile.P99)
.withOneMinuteMeanRate
.withFiveMinuteMeanRate
.withFifteenMinuteMeanRate
.withMeanRate
.withArithmeticMean
.withStdDev
.withStatisticSet
.withZeroValuesSubmission
.withReportRawCountValue
.withHighResolution
.withMeterUnitSentToCW(StandardUnit.Bytes)
.withJvmMetrics
.withGlobalDimensions("Region=us-west-2", "Instance=stage")
//.withDryRun //TODO remove this when ready!
.build
}
override def start(): Unit = cloudWatchReporter.start(30, TimeUnit.SECONDS)
override def stop(): Unit = cloudWatchReporter.stop()
override def report(): Unit = cloudWatchReporter.report()
}
Spark генерирует следующее исключение:
20/04/13 19:18:24 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Requesting driver to remove executor 1 for reason Container marked as failed: container_1586789645936_0006_01_000002 on host: ip-10-0-248-202.ec2.internal. Exit status: 1. Diagnostics: Exception from container-launch.
Container id: container_1586789645936_0006_01_000002
Exit code: 1
Stack trace: ExitCodeException exitCode=1:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:972)
at org.apache.hadoop.util.Shell.run(Shell.java:869)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1170)
at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:235)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:299)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:83)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Спасибо!