Учитывая ответ на ваш вопрос выше "Я просто хочу узнать в течение часового промежутка времени количество пользователей, которые выполнили определенное действие", я бы предложил следующее.
Предположим, у вас есть что-то вроде этого:
class ActionRecord {
String actionType;
String user;
}
Вы можете определить агрегатный класс примерно так:
class ActionRecordAggregate {
private Set<String> users = new HashSet<>();
public void add(ActionRecord rec) {
users.add(rec.getUser());
}
public int count() {
return users.size();
}
}
Тогда ваше потоковое приложение может:
- принять события
- перепишите их в соответствии с типом события (
.map()
)
- сгруппировать их по типу события (
.groupByKey()
)
- Окно их по времени (выбрано 1 минута, но YMMV)
- объединить их в
ActionRecordAggregate
- материализуйте их в StateStore
так это выглядит примерно так:
stream()
.map((key, val) -> KeyValue.pair(val.actionType, val))
.groupByKey()
.windowedBy(TimeWindows.of(60*1000))
.aggregate(
ActionRecordAggregate::new,
(key, value, agg) -> agg.add(value),
Materialized
.<String, ActionRecordAggregate, WindowStore<Bytes, byte[]>>as("actionTypeLookup")
.withValueSerde(getSerdeForActionRecordAggregate())
);
Затем, чтобы получить события обратно, вы можете запросить хранилище состояний:
ReadOnlyWindowStore<String, ActionRecordAggregate> store =
streams.store("actionTypeLookup", QueryableStoreTypes.windowStore());
WindowStoreIterator<ActionRecordAggregate> wIt =
store.fetch("actionTypeToGet", startTimestamp, endTimestamp);
int totalCount = 0;
while(wIt.hasNext()) {
totalCount += wIt.next().count();
}
// totalCount is the number of distinct users in your
// time interval that raised action type "actionTypeToGet"
Надеюсь, это поможет!