Подумайте, я это делаю:
DataStream<POJO> ds = ...
ds.assignTimestampsAndWatermarks(CustomAssigner)
.windowAll(...)
.apply(someFunction) //THIS FUNCTION CHANGES THE TIMESTAMP FIELD IN THE EVENTS
.assignTimestampsAndWatermarks(AnotherCustomAssigner)
Это действительно?Я не знаю, являются ли водяные знаки / метки времени глобальными или просто сохраняются в потоке данных?
Редактировать
class POJO{
int timestamp;
String someDetail; //key by this
...
}
DataStream ds = ....
ds.assignTimeStampsAndWatermarks(new AssignerWithPunctuatedWatermarks(){
long maxTS = Long.MIN_VALUE;
Watermarks checkAndGetNextWater(POJO, p, long l){
maxTS = max(...)
return new Watermarks(maxTS);
}
long ExtractTS(POJO p, long l){
maxTS = max(...)
return p.timeStamp;
}
}).keyBy(someDetail property)
.window(TumblingWindow(1 min))
.apply(new AllWindowFunction<POJO, POJO, String, TimeWindow>(){
public void apply(...){
POJO newPOJO = ...;
for(POJO it : iterable){
newPOJO.timeStamp += ...
}
collector.collect(newPOJO);
}
})
Теперь мне интересно
Если яЯ должен назначить временные метки снова, потому что я хочу сделать windowAll
, а затем apply
снова.
assignTimestamp...
.windowAll(..)
.apply(some other allwindow function)