(как-то связано с Как создать динамическую метрику в Flink )
У меня есть поток events(someid:String, name:String)
, и для целей мониторинга мне нужен счетчик на идентификатор события. Во всех документациях и примерах Flink я вижу, что счетчик, например, инициализируется именем в open
функции карты.
Но в моем случае я не могу инициализировать счетчик какМне понадобится один для eventId, и я не знаю заранее значение. Кроме того, я понимаю, как дорого будет создавать новый счетчик каждый раз, когда в методе MapFunction используется map()
метод даже. Наконец, я не могу сохранить «кэш» счетчиков, так как он будет слишком большим.
В идеале я хотел бы что-то вроде этого:
class Event(id: String, name: String)
class ExampleMapFunction extends RichMapFunction[Event, Event] {
@transient private var counter: Counter = _
override def open(parameters: Configuration): Unit = {
counter = new Counter()
}
override def map(event: Event): Event = {
counter.inc(event.id)
event
}
}
Или в принципе я мог бы реализовать свой собственныйсчетчик, который позволяет мне пройти измерение? если да, то как?
Есть ли какие-либо советы или рекомендации для такого варианта использования?