У меня в данный момент есть настройка Flink, и у меня запущено задание на EMR, и сейчас я пытаюсь добавить мониторинг, отправляя метрики в prometheus.
Я столкнулся с проблемой запуска Flink на EMR.Я использую Terraform для обеспечения EMR (я запускаю ansible после загрузки и запуска задания).Из коробки не похоже, что в дистрибутив Flink EMR входят дополнительные файлы jar (flink-metrics-prometheus, flink-cep и т. Д.).
Глядя на документацию Flink, там написано
"Чтобы использовать этот репортер, вы должны скопировать /opt/flink-metrics-prometheus-1.6.1.jar
в папку /lib
вашего дистрибутива Flink" https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/metrics.html#prometheuspushgateway-orgapacheflinkmetricsprometheusprometheuspushgatewayreporter
Но при входе в главный узел EMR ни в / etc / flink, ни в / usr / lib / flink нет каталога с именем opts
, и я нигде не вижу flink-metrics-prometheus-1.6.1.jar
.
Я знаю, что у Flink есть другие дополнительные библиотеки, которые вы обычно должны копировать, если хотите использовать их, например, flink-cep, но я не уверен, как это сделать при использовании EMR.
Этоя получаю исключение, которое, как я полагаю, заключается в том, что он не может найти jar метрик в своем пути к классам.
java.lang.ClassNotFoundException: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at org.apache.flink.runtime.metrics.MetricRegistryImpl.<init>(MetricRegistryImpl.java:144)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createMetricRegistry(ClusterEntrypoint.java:419)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:276)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:227)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:191)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:190)
at org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint.main(YarnSessionClusterEntrypoint.java:137)
ресурс EMR в terraform
name = "ce-emr-flink-arn"
release_label = "emr-5.20.0" # 5.21.0 is not found, could be a region thing
applications = ["Flink"]
ec2_attributes {
key_name = "ce_test"
subnet_id = "${aws_subnet.ce_test_subnet_public.id}"
instance_profile = "${aws_iam_instance_profile.emr_profile.arn}"
emr_managed_master_security_group = "${aws_security_group.allow_all_vpc.id}"
emr_managed_slave_security_group = "${aws_security_group.allow_all_vpc.id}"
additional_master_security_groups = "${aws_security_group.external_connectivity.id}"
additional_slave_security_groups = "${aws_security_group.external_connectivity.id}"
}
ebs_root_volume_size = 100
master_instance_type = "m4.xlarge"
core_instance_type = "m4.xlarge"
core_instance_count = 2
service_role = "${aws_iam_role.iam_emr_service_role.arn}"
configurations_json = <<EOF
[
{
"Classification": "flink-conf",
"Properties": {
"parallelism.default": "8",
"state.backend": "RocksDB",
"state.backend.async": "true",
"state.backend.incremental": "true",
"state.savepoints.dir": "file:///savepoints",
"state.checkpoints.dir": "file:///checkpoints",
"web.submit.enable": "true",
"metrics.reporter.promgateway.class": "org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter",
"metrics.reporter.promgateway.host": "${aws_instance.monitoring.private_ip}",
"metrics.reporter.promgateway.port": "9091",
"metrics.reporter.promgateway.jobName": "ce-test",
"metrics.reporter.promgateway.randomJobNameSuffix": "true",
"metrics.reporter.promgateway.deleteOnShutdown": "false"
}
}
]
EOF
}
Я подозреваю, что мне, возможно, придется скачатьJar на стадии начальной загрузки, но сначала хотел проверить это и посмотреть, есть ли примеры того, как это делается