Как контролировать пропускную способность определенных c операторов в Spark Streaming с помощью Prometheus? - PullRequest
0 голосов
/ 18 июня 2020

Я слежу за приложением потоковой передачи искр с помощью prometheus. Вот как я запускаю свое приложение:

./bin/spark-submit --master spark://127.0.0.1:7077 --deploy-mode cluster --driver-cores 4 --name "TestStreamCombineByKey" --conf "spark.driver.extraJavaOptions=-javaagent:/home/flink/spark-2.4.6-bin-hadoop2.7/jars/jmx_prometheus_javaagent-0.13.0.jar=8082:/home/flink/spark-2.4.6-bin-hadoop2.7/conf/spark.yml" /home/felipe/workspace-idea/explore-spark/target/scala-2.11/explore-spark_2.11-0.1.jar

Файл spark.yml II получил от здесь на основе этого ответа .

Затем я настроил файл /etc/prometheus/prometheus.yml для очистки метрик от Spark.

  - job_name: "spark_streaming_app"
    scrape_interval: "5s"
    static_configs:
      - targets: ['localhost:8082']

Я могу видеть некоторые метрики из Spark на панели инструментов Prometheus. Я также могу видеть метрики, используя адрес http://localhost: 4040 / metrics / json /

enter image description here

Однако я бы хотел для отслеживания пропускной способности определенных c операторов моего искрового приложения. Например, количество записей, входящих и исходящих из преобразований source, map и combineByKey. Как мне это контролировать?

val sparkConf = new SparkConf()
  .setAppName("TaxiRideCountCombineByKey")
  .setMaster("local[4]")
val ssc = new StreamingContext(sparkConf, Seconds(1))

val stream = ssc.receiverStream(new TaxiRideSource())
val driverStream = stream.map(taxiRide => (taxiRide.driverId, 1))
val countStream = driverStream.combineByKey(
  (v) => (v, 1), //createCombiner
  (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1), //mergeValue
  (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2), // mergeCombiners
  new HashPartitioner(3)
)
countStream.print()
...