Я слежу за приложением потоковой передачи искр с помощью prometheus. Вот как я запускаю свое приложение:
./bin/spark-submit --master spark://127.0.0.1:7077 --deploy-mode cluster --driver-cores 4 --name "TestStreamCombineByKey" --conf "spark.driver.extraJavaOptions=-javaagent:/home/flink/spark-2.4.6-bin-hadoop2.7/jars/jmx_prometheus_javaagent-0.13.0.jar=8082:/home/flink/spark-2.4.6-bin-hadoop2.7/conf/spark.yml" /home/felipe/workspace-idea/explore-spark/target/scala-2.11/explore-spark_2.11-0.1.jar
Файл spark.yml II получил от здесь на основе этого ответа .
Затем я настроил файл /etc/prometheus/prometheus.yml для очистки метрик от Spark.
- job_name: "spark_streaming_app"
scrape_interval: "5s"
static_configs:
- targets: ['localhost:8082']
Я могу видеть некоторые метрики из Spark на панели инструментов Prometheus. Я также могу видеть метрики, используя адрес http://localhost: 4040 / metrics / json /
Однако я бы хотел для отслеживания пропускной способности определенных c операторов моего искрового приложения. Например, количество записей, входящих и исходящих из преобразований source
, map
и combineByKey
. Как мне это контролировать?
val sparkConf = new SparkConf()
.setAppName("TaxiRideCountCombineByKey")
.setMaster("local[4]")
val ssc = new StreamingContext(sparkConf, Seconds(1))
val stream = ssc.receiverStream(new TaxiRideSource())
val driverStream = stream.map(taxiRide => (taxiRide.driverId, 1))
val countStream = driverStream.combineByKey(
(v) => (v, 1), //createCombiner
(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1), //mergeValue
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2), // mergeCombiners
new HashPartitioner(3)
)
countStream.print()