Kafka Streams для свертывания метрических данных - PullRequest
0 голосов
/ 21 января 2019

Приложение собирает тысячи метрик в теме Кафки «raw-метрики». Хранение их в необработанном формате и выборка для отображения в пользовательском интерфейсе требует значительных ресурсов и медленно Я хочу агрегировать каждую метрику и вычислять «Среднее, Сумма, Мин., Макс., Среднее, Медиана, Дисперсия» и т. Д. Для каждого часа / дня и сохранять агрегированные значения в базе данных.

Приложение использует Kafka v1.1 и пытается узнать, могут ли потоки Kafka помочь вам в этом.

Входной метрический поток -

отметка времени: t1 metricID: m1 metricName: HTTPRequest metricValue: 10

отметка времени: t1 metricID: м2 metricName: HTTPResponse metricValue: 9

отметка времени: t2 metricID: m1 metricName: HTTPRequest metricValue: 20

Ожидаемый результат - Метрическая | Метка времени | Медиана | Сумма | Avg | Дисперсия | Среднее значение


HTTPRequest 2019-01-01 05: 30: 00 + 0530 до 2019-01-01 06: 30: 00 + 0530 X Y Z Q

HTTPRequest 2019-01-01 05: 30: 00 + 0530 до 2019-01-01 06: 30: 00 + 0530 X Y Z Q

Вот мой подход -

  1. Считать фид темы 'raw-metric' в виде потока
  2. Используйте падающие окна одного часа
  3. Агрегировать метрику и вычислить необходимые свернутые значения
  4. стрим в "свернутую" тему кафки
metricStream
                .filter((k, v) -> (MetricFilter::Validate))
                .map((k, v) -> new KeyValue<>(v.metricID, v))
                .through(stringSerde, metricStreamSerde, "topic.metric.output")
                .groupBy((k, v) -> k, stringSerde, metricStreamSerde)
                .aggregate(MetricStreamCollector::new, (k, v, MetricStreamCollector) -> MetricStreamCollector.add(v),
                        TimeWindows.of(1 * 60 * 1000).until(60000L), collectorSerde,
                        "topic.metric.aggregation")
                .to(windowedSerde, collectorSerde, new MetricStreamPartitioner(), "topic.metric.rollup"));

Пожалуйста, помогите мне понять следующее -

  1. Как обработать исключение, если оно есть? например, что если агрегат выдает исключение?
  2. Как рассчитать свертку, такую ​​как Среднее, верхний квартиль, Медиана и дисперсия для каждой метрики в этой топологии?

Спасибо.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...