Столкновение имени Метрики Flink - PullRequest
0 голосов
/ 30 октября 2018

Работа My Flink (1.6) прослушивает поток и выполняет некоторую агрегацию. Я хочу собрать показатели после агрегации, но у меня возникли некоторые трудности.

Мои метрики выглядят так:

id_1, 0.1
id_2, 0.3
...

Идентификаторы будут переменными, а значения будут увеличиваться и уменьшаться с течением времени, так что это выглядело как Датчик , который был наиболее подходящим.

Я создал эту функцию карты, чтобы фиксировать эти метрики в шкале:

class MetricsMapper extends RichMapFunction[MyObject, Double] {

  override def map(obj: MyObject): Double = {
    val metricVal = obj.metricVal
    getRuntimeContext.getMetricGroup.gauge[Double, ScalaGauge[Double]](obj.id, ScalaGauge[Double](() => metricVal))
    metricVal
  }
}

Как это показывает, я использую свойство id моего объекта для регистрации датчика.

Проблема, с которой я столкнулся, заключается в том, что я получаю это предупреждение при запуске задания:

Name collision: Group already contains a Metric with the name "x" Metric will not be reported

Я интерпретирую это, поскольку мы уже создали этот датчик ранее в потоке, а новое значение игнорируется. Есть ли способ преодолеть это?

Спасибо

Ответы [ 2 ]

0 голосов
/ 30 октября 2018

Вы должны следовать шаблону, показанному в документации :

new class MyMapper extends RichMapFunction[MyObject, Double] {
  @transient private var valueToExpose = 0.0

  override def open(parameters: Configuration): Unit = {
    getRuntimeContext()
      .getMetricGroup()
      .gauge[Double, ScalaGauge[Double]]("MyGauge", ScalaGauge[Double]( () => valueToExpose ) )
  }

  override def map(obj: MyObject): String = {
    valueToExpose = obj.metricval
    valueToExpose
  }
}

Другими словами, зарегистрируйте датчик один раз в методе open () и обновляйте значение каждый раз, когда вызывается map ().

В вашем случае вам нужен отдельный датчик для каждого уникального идентификатора объекта. Если вы действительно хотите сделать это с метриками, вам нужно будет хранить что-то вроде хэш-карты датчиков, создавая новые по мере необходимости и обновляя значение соответствующего датчика в функции map (). Или лучше, введите ваш поток по id.

Еще один фактор, который следует учитывать при рассмотрении вопроса о целесообразности использования метрик, заключается в том, что метрики не являются контрольными точками.

0 голосов
/ 30 октября 2018

Вы уверены, что хотите использовать метрики здесь? Метрики обычно используются в качестве средства для наблюдения за выполнением работы. Обычные значения, для которых вы хотите использовать метрики:

  • записей в секунду,
  • поздние события
  • количество поврежденных событий и т. Д.

В вашем случае я бы предпочел использовать какой-нибудь побочный конвейер, производящий эти агрегаты.

...