Flink: Как запустить таймер времени обработки в CustomTrigger только на первом элементе? - PullRequest
0 голосов
/ 29 января 2019

Я использую GlobalWindow с пользовательским Trigger для своего приложения.Согласно требованию, в функции Trigger мне нужно запустить таймер времени обработки только для первого элемента в окне.

Я пытался реализовать его с помощью переменной firstEventflag.Вот так.

.window(GlobalWindows.create())
.trigger(new Trigger<ImpactEventObject, GlobalWindow>() {
    Boolean firstEventflag = false;

    @Override
    public TriggerResult onElement(ImpactEventObject impactEventObject, long l, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {
        if (!firstEventflag) {
            firstEventflag = true;
            triggerContext.registerProcessingTimeTimer(
                triggerContext.getCurrentProcessingTime() + 20000);
        }
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long l, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {
    return TriggerResult.FIRE;
}

Но это не удается, потому что сегодня я обнаружил, что переменная firstEventflag не инициализируется каждый раз, когда создается новое окно, это зависит от подзадачи, которая обрабатывает окно, то есть разные окнаможет использовать одну и ту же переменную firstEventflag, что делает эту логику практически бесполезной.Учитывая это, как мне решить мою проблему?

1 Ответ

0 голосов
/ 29 января 2019

Мы нашли способ сделать это, посмотрев на исходный код CountTrigger здесь .

Мы можем сохранить количество элементов в GlobalWindow с помощьюReducingStateDescriptor.И запускать таймер, когда этот счетчик равен 1, что означает - запускать таймер только на первом элементе.

public class CustomTrigger extends Trigger<GenericObject, GlobalWindow> {

private final ReducingStateDescriptor<Long> stateDesc = new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE);

@Override
public TriggerResult onElement(ImpactEventObject impactEventObject, long l, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {
    ReducingState<Long> count = triggerContext.getPartitionedState(stateDesc);
    count.add(1L);

    if (count.get() == 1) {
        triggerContext.registerProcessingTimeTimer(
            triggerContext.getCurrentProcessingTime() + 20000);
    }
    return TriggerResult.CONTINUE;
}

@Override
public TriggerResult onProcessingTime(long l, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {
    return TriggerResult.FIRE;
}

@Override
public TriggerResult onEventTime(long l, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {
    return null;
}

@Override
public void clear(GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {
    triggerContext.deleteProcessingTimeTimer(triggerContext.getCurrentProcessingTime());
}

private static class Sum implements ReduceFunction<Long> {
    private static final long serialVersionUID = 1L;
    @Override
    public Long reduce(Long value1, Long value2) throws Exception {
        return value1 + value2;
    }

}
}
...