Впервые на вопрос. У меня вопрос по поводу Apache Флинк. Чтобы измерить производительность задержки для каждого параллельного числа Apache Flink, мы хотим суммировать разницу во времени между тем, когда окно создается и когда это окно генерируется для каждого окна.
Задержка рассчитывается с использованием system.currenttimemillis ()
и public long start_time
определены в конструкторе Window с помощью Watermark Trigger, но значение довольно велико для ширины окна. В качестве причины этой странности я подумал, что Флинк повторно использует Окно. Что на самом деле происходит?
Кроме того, даже если start_time окна подставляется на стороне триггера, оно не изменилось. Что это должно делать?
Кроме того, если у вас есть лучший способ измерить время выживания Окна, я был бы признателен, если бы вы сказали мне. (Если Flink повторно использует окно, start_time
из отброшенного окна будет использовано снова.)
private static class MyTrigger extends Trigger<Tuple2<Integer,Integer>,MyTimeWindow>{
File ratency_file;
MyTrigger(){
super();
try{
ratency_file = new File(ratency_filePath);
FileWriter filewriter = new FileWriter(ratency_file,false);
filewriter.close();
}catch(IOException e){
System.out.println("IOException");
}
}
@Override
public TriggerResult onElement(Tuple2<Integer,Integer> element, long timestamp, MyTimeWindow window, Trigger.TriggerContext ctx) throws IOException {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
int index = (int)(System.currentTimeMillis()-window.start_time);
FileWriter filewriter = new FileWriter(ratency_file,true);
filewriter.write(Integer.toString(index)+"\n");
filewriter.close();
return TriggerResult.FIRE_AND_PURGE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
}
@Override
public TriggerResult onEventTime(long time, MyTimeWindow window, Trigger.TriggerContext ctx) throws IOException {
TriggerResult res = time == window.maxTimestamp() ? TriggerResult.FIRE_AND_PUREGE : TriggerResult.CONTINUE;
if( res == TriggerResult.FIRE_AND_PURGE ){
int index = (int)(System.currentTimeMillis()-window.start_time);
FileWriter filewriter = new FileWriter(ratency_file,true);
filewriter.write(Integer.toString(index)+"\n");
filewriter.close();
}
return res;
}
@Override
public void clear(MyTimeWindow window, Trigger.TriggerContext ctx) throws IOException{
ctx.deleteEventTimeTimer(window.maxTimestamp());
}
@Override
public TriggerResult onProcessingTime(long timestamp, MyTimeWindow window , Trigger.TriggerContext ctx){
TriggerResult res = TriggerResult.CONTINUE;
return res;
}
}