У меня есть сообщения kafka, похожие на следующий шаблон:
{ user: 'someUser', value: 'SomeValue' , timestamp:000000000}
С вычислением потока Flink, который выполняет некоторые действия по подсчету этих элементов.
Теперь я хотите объявить сеанс, собрать одно и то же значение user + в диапазоне X секунд как единое целое с последней отметкой времени, затем он будет перенаправлен в следующий поток только один раз
Так что я написал что-то вроде что:
data.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Data>() {
.....
})
.keyBy(new KeySelector<Data, String>(){
.......
})
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.aggregate(new AggregateFunction<Data, Data, Data>() {
@Override
public Data createAccumulator() {
return null;
}
@Override
public Data add(Data value, Data accumulator) {
if(accumulator == null) {
accumulator = value;
}
return accumulator;
}
@Override
public Data getResult(Data accumulator) {
return accumulator;
}
@Override
public Data merge(Data a, Data b) {
return a;
}
});
Но проблема в том, что функция getResult вызывается для каждого элемента, а не только в конце окна.
Моя проблема заключается в том, чтобы не пересылать агрегацию результат до конца окна до следующего потока. насколько я знаю, также результат процесса потока движется вперед, когда больше нет элементов, даже если windows не конец да
любой совет?
Спасибо