Установка таймера на минимальную отметку времени - PullRequest
1 голос
/ 30 апреля 2019

Я хотел бы установить таймер во время события, которое срабатывает на основе наименьшей отметки времени, видимой в элементах моего DoFn.

1 Ответ

1 голос
/ 30 апреля 2019

По соображениям производительности API таймера не поддерживает операцию чтения (), которая в подавляющем большинстве случаев не является обязательной функцией.В небольшом наборе случаев использования, где это необходимо, например, когда вам нужно установить Timer в EventTime на основе наименьшей отметки времени, видимой в элементах внутри DoFn, мы можем использовать объект State для отслеживания значения.

Java (SDK 2.10.0)

    // In this pattern, a Timer is set to fire based on the lowest timestamp seen in the DoFn. 
public class SetEventTimeTimerBasedOnEarliestElementTime {

  private static final Logger LOG = LoggerFactory
      .getLogger(SetEventTimeTimerBasedOnEarliestElementTime.class);

  public static void main(String[] args) {

    // Create pipeline
    PipelineOptions options = PipelineOptionsFactory.
        fromArgs(args).withValidation().as(PipelineOptions.class);

    // We will start our timer at a fixed point
    Instant now = Instant.parse("2000-01-01T00:00:00Z");

    // ----- Create some dummy data

    // Create 3 elements, incrementing by 1 minute
    TimestampedValue<KV<String, Integer>> time_1 = TimestampedValue.of(KV.of("Key_A", 1), now);

    TimestampedValue<KV<String, Integer>> time_2 = TimestampedValue
        .of(KV.of("Key_A", 2), now.plus(Duration.standardMinutes(1)));

    TimestampedValue<KV<String, Integer>> time_3 = TimestampedValue
        .of(KV.of("Key_A", 3), now.plus(Duration.standardMinutes(2)));

    Pipeline p = Pipeline.create(options);

    // Apply a fixed window of duration 10 min and Sum the results
    p.apply(Create.timestamped(time_3, time_2, time_1)).apply(
        Window.<KV<String, Integer>>into(FixedWindows.<Integer>of(Duration.standardMinutes(10))))
        .apply(ParDo.of(new StatefulDoFnThatSetTimerBasedOnSmallestTimeStamp()));

    p.run();
  }

  /**
   * Set timer to the lowest value that we see in the stateful DoFn
   */
  public static class StatefulDoFnThatSetTimerBasedOnSmallestTimeStamp
      extends DoFn<KV<String, Integer>, KV<String, Integer>> {

    // Due to performance considerations there is no read on a timer object.
    // We make use of this Long value to keep track.
    @StateId("currentTimerValue") private final StateSpec<ValueState<Long>> currentTimerValue =
        StateSpecs.value(BigEndianLongCoder.of());

    @TimerId("timer") private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);

    @ProcessElement public void process(ProcessContext c,
        @StateId("currentTimerValue") ValueState<Long> currentTimerValue,
        @TimerId("timer") Timer timer) {

      Instant timeStampWeWantToSet = c.timestamp();

      //*********** Set Timer

      // If the timer has never been set then we set it.
      // If the timer has been set but is larger than our current value then we set it.
      if (currentTimerValue.read() == null || timeStampWeWantToSet.getMillis() < currentTimerValue
          .read()) {

        timer.set(timeStampWeWantToSet);
        currentTimerValue.write(timeStampWeWantToSet.getMillis());
      }

    }

    @OnTimer("timer") public void onMinTimer(OnTimerContext otc,
        @StateId("currentTimerValue") ValueState<Long> currentTimerValue,
        @TimerId("timer") Timer timer) {

      // Reset the currentTimerValue
      currentTimerValue.clear();

      LOG.info("Timer @ {} fired", otc.timestamp());

    }

  }

}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...