allowLateness для пользовательского триггера Global Window - PullRequest
1 голос
/ 19 февраля 2020

Я создал пользовательский триггер и функцию обработки для своего потока событий.

DataStream<DynamoDBRow> dynamoDBRows =
    sensorEvents
        .keyBy("id")
        .window(GlobalWindows.create())
        .trigger(new MyCustomTrigger())
        .allowedLateness(Time.minutes(1)) # Note
        .process(new MyCustomWindowProcessFunction());

Мой триггер основан на параметре события. После получения сигнала об окончании события MyCustomWindowProcessFunction () применяется к элементам окна.

@Slf4j
public class MyCustomTrigger extends Trigger<SensorEvent, GlobalWindow> {

  @Override
  public TriggerResult onElement(SensorEvent element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {

    if (element.isEventEnd() == true) {
      return TriggerResult.FIRE_AND_PURGE;
    }

    return TriggerResult.CONTINUE;
  }

  @Override
  public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
    return TriggerResult.CONTINUE;
  }

  @Override
  public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
    return TriggerResult.CONTINUE;
  }

  @Override
  public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {}
}

Может быть несколько данных датчика, которые могут поступить даже после запуска. Поэтому я добавил .allowedLateness(Time.minutes(1)), чтобы гарантировать, что эти события не будут пропущены при обработке.

В моем случае allowLateness не работает.

После перехода в документах я нашел это

allowedLateness is not applicable for Global Window

Как включить allowLateness в GlobalWindow?

Примечание : Я также пытался установить параметры времени среды c

env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

Обновление: 20-02-2020

В настоящее время думаю о следующем подходе , (Пока не работает)

@Slf4j
public class JourneyTrigger extends Trigger<SensorEvent, GlobalWindow> {

  private final long allowedLatenessMillis;

  public JourneyTrigger(Time allowedLateness) {
    this.allowedLatenessMillis = allowedLateness.toMilliseconds();
  }

  @Override
  public TriggerResult onElement(SensorEvent element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {

    if (element.isEventEnd() == true) {
      log.info("Timer started with allowedLatenessMillis " + allowedLatenessMillis);
      ctx.registerEventTimeTimer(System.currentTimeMillis() + allowedLatenessMillis);
    }

    return TriggerResult.CONTINUE;
  }

  @Override
  public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
    log.info("onEvenTime called at "+System.currentTimeMillis() );
    return TriggerResult.FIRE_AND_PURGE;
  }


  @Override
  public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
    return TriggerResult.CONTINUE;
  }

  @Override
  public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {}
}

Ответы [ 2 ]

0 голосов
/ 19 февраля 2020

Наконец, я смог выполнить свое требование, используя приведенный ниже пользовательский триггер.

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;

@Slf4j
public class JourneyTrigger extends Trigger<SensorEvent, GlobalWindow> {

  private final long allowedLatenessMillis;

  public JourneyTrigger(Time allowedLateness) {
    this.allowedLatenessMillis = allowedLateness.toMilliseconds();
  }

  @Override
  public TriggerResult onElement(SensorEvent element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {

    if (element.isEventEnd()==true) {
      log.info("Timer started with allowedLatenessMillis " + allowedLatenessMillis);
      ctx.registerProcessingTimeTimer(System.currentTimeMillis() + allowedLatenessMillis);
    }

    return TriggerResult.CONTINUE;
  }

  @Override
  public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
    log.info("onProcessingTime called at "+System.currentTimeMillis() );
    return TriggerResult.FIRE_AND_PURGE;
  }

  @Override
  public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
    return TriggerResult.CONTINUE;
  }



  @Override
  public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {}
}

Также в классе Driver.java установите среду Time Characteristi c

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
0 голосов
/ 19 февраля 2020

Если честно, я не вижу смысла использовать GlobalWindow здесь. Вы можете просто использовать KeyedProcessFunction, который будет служить той же цели, что и Ваш Trigger, в основном, он соберет все элементы с начала события до конца события в ListState, а затем, когда вы получите isEventEnd()==true, Вы можете просто запланировать таймер EventTime, который сработает через минуту и ​​выдаст результаты, собранные внутри ListState.

...