Приложение собирает тысячи метрик в теме Кафки «raw-метрики». Хранение их в необработанном формате и выборка для отображения в пользовательском интерфейсе требует значительных ресурсов и медленно Я хочу агрегировать каждую метрику и вычислять «Среднее, Сумма, Мин., Макс., Среднее, Медиана, Дисперсия» и т. Д. Для каждого часа / дня и сохранять агрегированные значения в базе данных.
Приложение использует Kafka v1.1 и пытается узнать, могут ли потоки Kafka помочь вам в этом.
Входной метрический поток -
отметка времени: t1
metricID: m1
metricName: HTTPRequest
metricValue: 10
отметка времени: t1
metricID: м2
metricName: HTTPResponse
metricValue: 9
отметка времени: t2
metricID: m1
metricName: HTTPRequest
metricValue: 20
Ожидаемый результат -
Метрическая | Метка времени | Медиана | Сумма | Avg | Дисперсия | Среднее значение
HTTPRequest 2019-01-01 05: 30: 00 + 0530 до 2019-01-01 06: 30: 00 + 0530 X Y Z Q
HTTPRequest 2019-01-01 05: 30: 00 + 0530 до 2019-01-01 06: 30: 00 + 0530 X Y Z Q
Вот мой подход -
- Считать фид темы 'raw-metric' в виде потока
- Используйте падающие окна одного часа
- Агрегировать метрику и вычислить необходимые свернутые значения
- стрим в "свернутую" тему кафки
metricStream
.filter((k, v) -> (MetricFilter::Validate))
.map((k, v) -> new KeyValue<>(v.metricID, v))
.through(stringSerde, metricStreamSerde, "topic.metric.output")
.groupBy((k, v) -> k, stringSerde, metricStreamSerde)
.aggregate(MetricStreamCollector::new, (k, v, MetricStreamCollector) -> MetricStreamCollector.add(v),
TimeWindows.of(1 * 60 * 1000).until(60000L), collectorSerde,
"topic.metric.aggregation")
.to(windowedSerde, collectorSerde, new MetricStreamPartitioner(), "topic.metric.rollup"));
Пожалуйста, помогите мне понять следующее -
- Как обработать исключение, если оно есть? например, что если агрегат выдает исключение?
- Как рассчитать свертку, такую как Среднее, верхний квартиль, Медиана и дисперсия для каждой метрики в этой топологии?
Спасибо.