Apache производительность задержки Flink - PullRequest
0 голосов
/ 03 февраля 2020

Впервые на вопрос. У меня вопрос по поводу Apache Флинк. Чтобы измерить производительность задержки для каждого параллельного числа Apache Flink, мы хотим суммировать разницу во времени между тем, когда окно создается и когда это окно генерируется для каждого окна.

Задержка рассчитывается с использованием system.currenttimemillis () и public long start_time определены в конструкторе Window с помощью Watermark Trigger, но значение довольно велико для ширины окна. В качестве причины этой странности я подумал, что Флинк повторно использует Окно. Что на самом деле происходит?

Кроме того, даже если start_time окна подставляется на стороне триггера, оно не изменилось. Что это должно делать?

Кроме того, если у вас есть лучший способ измерить время выживания Окна, я был бы признателен, если бы вы сказали мне. (Если Flink повторно использует окно, start_time из отброшенного окна будет использовано снова.)

private static class MyTrigger extends Trigger<Tuple2<Integer,Integer>,MyTimeWindow>{
File ratency_file;
MyTrigger(){
  super();
  try{
    ratency_file = new File(ratency_filePath);
    FileWriter filewriter = new FileWriter(ratency_file,false);
    filewriter.close();
  }catch(IOException e){
    System.out.println("IOException");
  }
}
@Override
public TriggerResult onElement(Tuple2<Integer,Integer> element, long timestamp, MyTimeWindow window, Trigger.TriggerContext ctx) throws IOException {
  if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
        // if the watermark is already past the window fire immediately
      int index = (int)(System.currentTimeMillis()-window.start_time);
      FileWriter filewriter = new FileWriter(ratency_file,true);
      filewriter.write(Integer.toString(index)+"\n");
      filewriter.close();
            return TriggerResult.FIRE_AND_PURGE;
      } else {
            ctx.registerEventTimeTimer(window.maxTimestamp());
            return TriggerResult.CONTINUE;
      }
}
@Override
public TriggerResult onEventTime(long time, MyTimeWindow window, Trigger.TriggerContext ctx) throws IOException {
  TriggerResult res = time == window.maxTimestamp() ? TriggerResult.FIRE_AND_PUREGE : TriggerResult.CONTINUE;
  if( res == TriggerResult.FIRE_AND_PURGE ){
    int index = (int)(System.currentTimeMillis()-window.start_time);
    FileWriter filewriter = new FileWriter(ratency_file,true);
    filewriter.write(Integer.toString(index)+"\n");
    filewriter.close();
  }
  return res;
}
@Override
public void clear(MyTimeWindow window, Trigger.TriggerContext ctx) throws IOException{
  ctx.deleteEventTimeTimer(window.maxTimestamp());
}
@Override
public TriggerResult onProcessingTime(long timestamp, MyTimeWindow window , Trigger.TriggerContext ctx){
  TriggerResult res = TriggerResult.CONTINUE;
  return res;
}

}

1 Ответ

0 голосов
/ 03 февраля 2020

Вы используете время события windows, и, вычисляя System.currentTimeMillis()-window.start_time, когда окно готово к закрытию, вы смешиваете много разных источников задержки.

Временное окно события запускается, когда появляется водяной знак, который проходит через конец окна. Предполагая генератор водяных знаков BoundedOutOfOrderness, это не может произойти, пока не прибудет событие с отметкой времени, превышающей время окончания окна плюс задержка, настроенная для размещения событий не по порядку. Кроме того, водяной знак будет создаваться не сразу по прибытии такого события, а после истечения срока действия autowatermark.

Собирая все это вместе, измеряемая вами задержка включает в себя множество вещей, которые могут показаться довольно большими:

  • время, прошедшее между моментом, когда события имеют метку времени и когда они попадают в Flink ( потому что window.start_time основан на отметках времени в событиях, и вы сравниваете это с System.currentTimeMillis())
  • задержкой, налагаемой водяными знаками (для размещения событий не по порядку)
  • задержки из-за интервала автоматического добавления водяных знаков (по умолчанию: 200 мсек c)
  • задержки из сетевой буферизации (по умолчанию: 100 мсек c)
  • длительность окна
...