Kafka Streams - извлечение временной метки для списка объектов для каждой записи - PullRequest
0 голосов
/ 02 мая 2020

Чего я хочу добиться, так это получить счетчик каждого сообщения, присутствующего в записи, на основе отметки времени, присутствующей в сообщении. Каждая запись состоит из List<Metric> объекта. Я хотел бы извлечь временную метку для каждого метри c и объединить метри c на основе имени метри c.

Метри c

public class Metric {

    String metric;
    Long timestamp;
    Double value;
}

Пользовательский экстрактор меток времени

Я реализовал этот экстрактор меток времени, который преобразует запись в объект списка. И в настоящее время он выбирает первую временную метку, которая выполняет управление окнами для этого ArrayList.

public class EventTimestampExtractor implements TimestampExtractor {

    public long extract(ConsumerRecord<Object, Object> record, long previousTimeStamp) {
        try {
            // Have a ListSerde in place to deserialize the record to a  List<Metric> object.
            final List<Metric> value = (List<Metric>) record.value();
            final Metric metric = value.get(0); // Returning the first timestamp from the metric list. 
            return metric.getTimestamp();
        }
        catch (Exception e) {
            // If there is an exception, return back the event time.
            return record.timestamp();
        }
    }
}

Топология

Как только я получаю список, я выполняю FlatTransform, чтобы преобразовать это Составьте список и выполните агрегирование на основе сведенного списка.

final StreamsBuilder builder = new StreamsBuilder();
KStream<String, List<Metric>> stream = builder.stream(inputTopic, Consumed.with(Serdes.String(),new MetricListSerde()));

TimeWindows windows = TimeWindows.of(Duration.ofSeconds(10)).grace(Duration.ofSeconds(2));

stream.filter((key, value) -> value != null)
                .flatTransform(() -> new MetricsTransformer()) // Flat transforming the list to single metrics
                .groupByKey()
                .windowedBy(windows)
                .count()
                .toStream()
                .to("output-topic");

Metri c Пример списка - Если вы заметили, что есть один показатель c и 3 счета (2 между 0 -10 и 1 через 10 секунд)

[{  "metric": "metric1.count",
    "timestamp": 1,
    "value": 30
},{
    "metric": "metric1.count",
    "timestamp": 2,
    "value": 30
}, {
    "metric": "metric1.count",
    "timestamp": 15,
    "value": 30
}]

Мое окно 10 секунд, и я хотел бы получить счетчик для метри c. Мой текущий результат выглядит как -

Window{startMs=0, endMs=10} and Value metric: metric1.count value: 3  aggregator: count interval: "10s"}

Ожидаемый результат -

Window{startMs=0, endMs=10} and Value metric: metric1.count value: 2  aggregator: count interval: "10s"}
Window{startMs=10, endMs=20} and Value metric: metric1.count value: 1  aggregator: count interval: "10s"}

Извините за длинный вопрос, но есть ли способ извлечь несколько временных меток из одной записи, которая содержит коллекцию сообщений?

Версия Kafka Streams - 2.4.1

1 Ответ

0 голосов
/ 03 мая 2020

TimestampExtractor не помогает в вашем случае использования, потому что он может дать вам только одну временную метку. При использовании flatMap() все выходные записи наследуют временную метку входной записи.

Если вам нужно оперативно изменять временную метку, вам нужно использовать transform() для реализации «плоской карты». Для каждой входной записи вы можете вызывать context.forward() несколько раз, чтобы выполнить фактически плоское отображение (вы можете просто return null; в конце, чтобы не создавать никаких дополнительных записей). В каждом вызове forward() вы можете установить новую метку времени с помощью To.all().withTimestamp(...):

public KeyValue transform(K key, V value) {
    for (...) {
       context.forward(newKey, newValue, To.all().withTimestamp(newTimestamp);
    }
    return null;
}
...