APACHE FLINK AggregateFunction с tumblingWindow для подсчета событий, но также для отправки 0, если событие не произошло - PullRequest
0 голосов
/ 17 июня 2020

Мне нужно подсчитать события в переворачивающемся окне. Но я также хочу отправлять события со значением 0, если в окне не было событий.

Что-то вроде.

  1. windowCount: 5
  2. windowCount: 0
  3. windowCount: 0
  4. windowCount: 3
  5. windowCount: 0 ...
import com.google.protobuf.Message;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.skydivin4ng3l.cepmodemon.models.events.aggregate.AggregateOuterClass;

public class BasicCounter<T extends Message> implements AggregateFunction<T, Long, AggregateOuterClass.Aggregate> {
    @Override
    public Long createAccumulator() {
        return 0L;
    }

    @Override
    public Long add(T event, Long accumulator) {
        return accumulator + 1L;
    }

    @Override
    public AggregateOuterClass.Aggregate getResult(Long accumulator) {
        return AggregateOuterClass.Aggregate.newBuilder().setVolume(accumulator).build();
    }

    @Override
    public Long merge(Long accumulator1, Long accumulator2) {
        return accumulator1 + accumulator2;
    }
}

и используется здесь

DataStream<AggregateOuterClass.Aggregate> aggregatedStream = someEntryStream
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.aggregate(new BasicCounter<MonitorOuterClass.Monitor>());

TimeCharacteristics - ingestionTime

Я читал о функции TiggerFunction, которая может определять, получил ли агрегированный поток событие по прошествии x времени, но я не уверен, что это правильный способ сделать это.

Я ожидал, что агрегирование произойдет, даже если в окне вообще не будет событий. Может быть, есть настройка, о которой я не знаю?

Спасибо за любые подсказки.

Ответы [ 2 ]

0 голосов
/ 24 июня 2020

Я выбрал вариант 1, как было предложено @ David-Anderson:

Вот мой Генератор событий:

public class EmptyEventSource implements SourceFunction<MonitorOuterClass.Monitor> {

    private volatile boolean isRunning = true;

    private final long delayPerRecordMillis;

    public EmptyEventSource(long delayPerRecordMillis){
        this.delayPerRecordMillis = delayPerRecordMillis;
    }

    @Override
    public void run(SourceContext<MonitorOuterClass.Monitor> sourceContext) throws Exception {
        while (isRunning) {
            sourceContext.collect(MonitorOuterClass.Monitor.newBuilder().build());

            if (delayPerRecordMillis > 0) {
                Thread.sleep(delayPerRecordMillis);
            }
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }
}

и моя настроенная AggregateFunction:

public class BasicCounter<T extends Message> implements AggregateFunction<T, Long, AggregateOuterClass.Aggregate> {
    @Override
    public Long createAccumulator() {
        return 0L;
    }

    @Override
    public Long add(T event, Long accumulator) {
        if(((MonitorOuterClass.Monitor)event).equals(MonitorOuterClass.Monitor.newBuilder().build())) {
            return accumulator;
        }

        return accumulator + 1L;
    }

    @Override
    public AggregateOuterClass.Aggregate getResult(Long accumulator) {
        AggregateOuterClass.Aggregate newAggregate = AggregateOuterClass.Aggregate.newBuilder().setVolume(accumulator).build();
        return newAggregate;
    }

    @Override
    public Long merge(Long accumulator1, Long accumulator2) {
        return accumulator1 + accumulator2;
    }
}

Использовал их вот так:

DataStream<MonitorOuterClass.Monitor> someEntryStream = env.addSource(currentConsumer);
DataStream<MonitorOuterClass.Monitor> triggerStream = env.addSource(new EmptyEventSource(delayPerRecordMillis));
DataStream<AggregateOuterClass.Aggregate> aggregatedStream = someEntryStream
                        .union(triggerStream)
                        .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                        .aggregate(new BasicCounter<MonitorOuterClass.Monitor>());
0 голосов
/ 18 июня 2020

Флинки windows создаются лениво, когда первое событие назначается окну. Таким образом, пустые windows не существуют и не могут давать результатов.

В общем, есть три способа решения этой проблемы:

  1. Поместите что-нибудь перед окном, что добавляет события в поток, гарантируя, что в каждом окне есть что-то в нем, а затем изменяет обработку окна, чтобы игнорировать эти специальные события при вычислении их результатов.
  2. Используйте GlobalWindow вместе с настраиваемым триггером, который использует таймеры обработки для запуска окна (при отсутствии текущих событий водяной знак не продвигается, а таймеры времени событий не срабатывают, пока не прибудет больше событий).
  3. Не используйте оконный API и реализуйте собственное управление окнами с ProcessFunction вместо этого. Но здесь вы все равно столкнетесь с проблемой необходимости использования таймеров времени обработки.

Обновление:

Сделав сейчас попытку реализовать пример варианта 2, я не могу рекомендовать Это. Проблема в том, что даже с пользовательским Trigger, ProcessAllWindowFunction не будет вызываться, если окно пустое, поэтому необходимо всегда сохранять хотя бы один элемент в GlobalWindow. Это, по-видимому, требует реализации довольно хакерских Evictor и ProcessAllWindowFunction, которые взаимодействуют, чтобы сохранить и игнорировать специальный элемент в окне - и вам также нужно каким-то образом в первую очередь поместить этот элемент в окно.

Если вы собираетесь сделать что-то взломанное, вариант 1 будет намного проще.

...