У меня есть поток kafka, и мне нужен процессор, который выполняет следующие действия:
Использует 45-секундное скачкообразное окно с 5-секундным опережением, чтобы вычислить первые 5 отсчетов на основе одного измерения объекта домена. Например, если поток будет содержать данные Clickstream, мне понадобятся первые 5 URL-адресов, которые просматриваются по имени домена, но также отображаются в виде окна с перескоком.
Я видел примеры для подсчета окон, например:
KStream<String, GenericRecord> pageViews = ...;
// Count page views per window, per user, with hopping windows of size 5 minutes that advance every 1 minute
KTable<Windowed<String>, Long> windowedPageViewCounts = pageViews
.groupByKey(Grouped.with(Serdes.String(), genericAvroSerde))
.windowedBy(TimeWindows.of(Duration.ofMinutes(5).advanceBy(Duration.ofMinutes(1))))
.count()
И Top n агрегаций на MusicExample, например:
songPlayCounts.groupBy((song, plays) ->
KeyValue.pair(TOP_FIVE_KEY,
new SongPlayCount(song.getId(), plays)),
Grouped.with(Serdes.String(), songPlayCountSerde))
.aggregate(TopFiveSongs::new,
(aggKey, value, aggregate) -> {
aggregate.add(value);
return aggregate;
},
(aggKey, value, aggregate) -> {
aggregate.remove(value);
return aggregate;
},
Materialized.<String, TopFiveSongs, KeyValueStore<Bytes, byte[]>>as(TOP_FIVE_SONGS_STORE)
.withKeySerde(Serdes.String())
.withValueSerde(topFiveSerde)
);
Мне просто не удается объединить 2 - где я получаю как оконное, так и верхнее русских скоплений. Есть мысли?