Пользовательский триггер Apache Flink с ProcessingTimeSessionWindow - PullRequest
0 голосов
/ 21 марта 2019

Я пытаюсь объединить объекты во входящем потоке по 2 критериям.

  1. Если общее количество объектов равно N, тогда объедините его и отправьте в нисходящий поток.
  2. Если время с момента последнего N объекта> = timeout, поместите его в корзину и отправьте в нисходящий поток.

Обе эти функции доступны в Flink отдельно как CountTrigger и ProcessingTimeSessionWindows.

Я пытаюсь объединить функциональность этих двух функций, чтобы создать собственный триггер и расширить ProcessingTimeSessionWindows, чтобы использовать этот триггер. Это вызывает второе условие, но не первое. Поскольку поток не является потоком с ключами, я не могу использовать ValueState для хранения счетчика, поэтому мне было интересно, какие у меня есть альтернативы для этого.

Код ниже:

public class ProcessingTimeCountSessionWindow extends ProcessingTimeSessionWindows {
    private static final long serialVersionUID = 786L;

    private final int count;

   private ProcessingTimeCountSessionWindow(int count, long timeout) {
       super(timeout);
       this.count = count;
   }

    @Override
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return ProcessingTimeCountTrigger.create(count);
    }

    /**
     * Creates a new {@code SessionWindows} {@link WindowAssigner} that assigns
     * elements to sessions based on the element timestamp.
     *
     * @param count Max count of elements in session i.e. the upper bound on count gap between sessions
     * @param size The session timeout, i.e. the time gap between sessions
     * @return The policy.
     */
    public static ProcessingTimeCountSessionWindow withCountAndGap(int count, Time size) {
        return new ProcessingTimeCountSessionWindow(count, size.toMilliseconds());
    }

}

Пользовательский триггер ниже:

Триггер подсчета использует ReducingState, но в моем потоке нет ключа, поэтому он не работает.

public class ProcessingTimeCountTrigger extends Trigger<Object, TimeWindow> {

    private static final long serialVersionUID = 786L;

    private final int maxCount;

    private final ReducingStateDescriptor<Integer> countStateDesc =
            new ReducingStateDescriptor<>("window-count", new ReduceFunctions.IntSum(), IntSerializer.INSTANCE);

    private ProcessingTimeCountTrigger(int maxCount) {
        this.maxCount = maxCount;
    }

    @Override
    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.registerProcessingTimeTimer(window.maxTimestamp());
        ReducingState<Integer> count = ctx.getPartitionedState(countStateDesc);
        count.add(1);
        if (count.get() >= maxCount) {
            return TriggerResult.FIRE_AND_PURGE;
        }
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.FIRE_AND_PURGE;
    }

    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public boolean canMerge() {
        return true;
    }

    @Override
    public void onMerge(TimeWindow window, OnMergeContext ctx) throws Exception {
        ctx.registerProcessingTimeTimer(window.maxTimestamp());
    }

    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.getPartitionedState(countStateDesc).clear();
    }

    public static ProcessingTimeCountTrigger create(int maxCount) {
        return new ProcessingTimeCountTrigger(maxCount);
    }
    @Override
    public String toString() {
        return "ProcessingTimeCountTrigger(" + maxCount + ")";
    }

}

1 Ответ

0 голосов
/ 21 марта 2019

Мне удалось решить эту проблему путем точного копирования CountTrigger и переопределения следующего:

@Override
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
    return TriggerResult.FIRE_AND_PURGE;
}

Мне также не нужно было расширять ProcessingTimeSessionWindow, поскольку я могу просто использовать созданный пользовательский триггер. К сожалению, мы не можем расширить CountTrigger из-за его закрытого конструктора, иначе это было бы лучшим решением.

Итак, окончательный код выглядит примерно так:

consoleInput.windowAll(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
            .trigger(ProcessingTimeCountTrigger.of(10L))
            .process(new ProcessAllWindowFunction<String, String, TimeWindow>() {
        @Override
        public void process(Context context, Iterable<String> elements, Collector<String> out) throws Exception {
            List<String> alphaList = new ArrayList<>();
            elements.forEach(alphaList::add);
            out.collect("Time is " + new Date().toString());
            out.collect("Total " + alphaList.size() + " elements in window");
        }
    })

Это отправляет данные в цепочке вниз по течению, если у нас есть 10 элементов ИЛИ прошло 10 секунд с тех пор, как мы последний раз видели элемент.

Код пользовательского триггера ниже:

public class ProcessingTimeCountTrigger<W extends Window> extends Trigger<Object, W> {
    private static final long serialVersionUID = 1L;

    private final long maxCount;

    private final ReducingStateDescriptor<Long> stateDesc =
            new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE);

    private ProcessingTimeCountTrigger(long maxCount) {
        this.maxCount = maxCount;
    }

    @Override
    public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
        ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
        count.add(1L);
        if (count.get() >= maxCount) {
            count.clear();
            return TriggerResult.FIRE_AND_PURGE;
        }
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(long time, W window, TriggerContext ctx) {
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
        return TriggerResult.FIRE_AND_PURGE;
    }

    @Override
    public void clear(W window, TriggerContext ctx) throws Exception {
        ctx.getPartitionedState(stateDesc).clear();
    }

    @Override
    public boolean canMerge() {
        return true;
    }

    @Override
    public void onMerge(W window, OnMergeContext ctx) throws Exception {
        ctx.mergePartitionedState(stateDesc);
    }

    @Override
    public String toString() {
        return "ProcessingTimeCountTrigger(" +  maxCount + ")";
    }

    /**
     * Creates a trigger that fires once the number of elements in a pane reaches the given count.
     *
     * @param maxCount The count of elements at which to fire.
     * @param <W> The type of {@link Window Windows} on which this trigger can operate.
     */
    public static <W extends Window> ProcessingTimeCountTrigger<W> of(long maxCount) {
        return new ProcessingTimeCountTrigger<>(maxCount);
    }

    private static class Sum implements ReduceFunction<Long> {
        private static final long serialVersionUID = 1L;

        @Override
        public Long reduce(Long value1, Long value2) throws Exception {
            return value1 + value2;
        }

    }
}
...