Переназначение меток времени, водяных знаков в Flink? - PullRequest
0 голосов
/ 08 февраля 2019

Подумайте, я это делаю:

DataStream<POJO> ds = ...
ds.assignTimestampsAndWatermarks(CustomAssigner)
.windowAll(...)
.apply(someFunction) //THIS FUNCTION CHANGES THE TIMESTAMP FIELD IN THE EVENTS
.assignTimestampsAndWatermarks(AnotherCustomAssigner)

Это действительно?Я не знаю, являются ли водяные знаки / метки времени глобальными или просто сохраняются в потоке данных?

Редактировать

class POJO{
   int timestamp;
   String someDetail; //key by this
   ...
}

DataStream ds = ....

ds.assignTimeStampsAndWatermarks(new AssignerWithPunctuatedWatermarks(){
   long maxTS = Long.MIN_VALUE;
   Watermarks checkAndGetNextWater(POJO, p, long l){
  maxTS = max(...)
  return new Watermarks(maxTS);
}

long ExtractTS(POJO p, long l){
  maxTS = max(...)
  return p.timeStamp;
}


  }).keyBy(someDetail property)
     .window(TumblingWindow(1 min))
      .apply(new AllWindowFunction<POJO, POJO, String, TimeWindow>(){
  public void apply(...){
    POJO newPOJO = ...;
    for(POJO it : iterable){
      newPOJO.timeStamp += ...
    }
    collector.collect(newPOJO);
  }
}) 

Теперь мне интересно

Если яЯ должен назначить временные метки снова, потому что я хочу сделать windowAll, а затем apply снова.

assignTimestamp...
.windowAll(..)
.apply(some other allwindow function)

1 Ответ

0 голосов
/ 09 февраля 2019

От вас не ожидается повторного вызова assignTimestampsAndWatermarks.Flink будет игнорировать временные метки в POJO, созданных WindowFunction, и будет метить временные записи потоковых записей, оборачивая эти события временными метками, полученными из времени в конце этого окна.Обычно это работает просто отлично, хотя последующее окно должно охватывать временной интервал, кратный первому целому числу.

Если вы пытаетесь создать новый поток, который должен быть повторно отмечен и иметь егоЕсли у вас есть новые водяные знаки, он может снова вызвать метод assignTimestampsAndWatermarks.

...