Чего я хочу добиться, так это получить счетчик каждого сообщения, присутствующего в записи, на основе отметки времени, присутствующей в сообщении. Каждая запись состоит из List<Metric>
объекта. Я хотел бы извлечь временную метку для каждого метри c и объединить метри c на основе имени метри c.
Метри c
public class Metric {
String metric;
Long timestamp;
Double value;
}
Пользовательский экстрактор меток времени
Я реализовал этот экстрактор меток времени, который преобразует запись в объект списка. И в настоящее время он выбирает первую временную метку, которая выполняет управление окнами для этого ArrayList.
public class EventTimestampExtractor implements TimestampExtractor {
public long extract(ConsumerRecord<Object, Object> record, long previousTimeStamp) {
try {
// Have a ListSerde in place to deserialize the record to a List<Metric> object.
final List<Metric> value = (List<Metric>) record.value();
final Metric metric = value.get(0); // Returning the first timestamp from the metric list.
return metric.getTimestamp();
}
catch (Exception e) {
// If there is an exception, return back the event time.
return record.timestamp();
}
}
}
Топология
Как только я получаю список, я выполняю FlatTransform, чтобы преобразовать это Составьте список и выполните агрегирование на основе сведенного списка.
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, List<Metric>> stream = builder.stream(inputTopic, Consumed.with(Serdes.String(),new MetricListSerde()));
TimeWindows windows = TimeWindows.of(Duration.ofSeconds(10)).grace(Duration.ofSeconds(2));
stream.filter((key, value) -> value != null)
.flatTransform(() -> new MetricsTransformer()) // Flat transforming the list to single metrics
.groupByKey()
.windowedBy(windows)
.count()
.toStream()
.to("output-topic");
Metri c Пример списка - Если вы заметили, что есть один показатель c и 3 счета (2 между 0 -10 и 1 через 10 секунд)
[{ "metric": "metric1.count",
"timestamp": 1,
"value": 30
},{
"metric": "metric1.count",
"timestamp": 2,
"value": 30
}, {
"metric": "metric1.count",
"timestamp": 15,
"value": 30
}]
Мое окно 10 секунд, и я хотел бы получить счетчик для метри c. Мой текущий результат выглядит как -
Window{startMs=0, endMs=10} and Value metric: metric1.count value: 3 aggregator: count interval: "10s"}
Ожидаемый результат -
Window{startMs=0, endMs=10} and Value metric: metric1.count value: 2 aggregator: count interval: "10s"}
Window{startMs=10, endMs=20} and Value metric: metric1.count value: 1 aggregator: count interval: "10s"}
Извините за длинный вопрос, но есть ли способ извлечь несколько временных меток из одной записи, которая содержит коллекцию сообщений?
Версия Kafka Streams - 2.4.1