состояние агрегата флинк огромно, как исправить - PullRequest
0 голосов
/ 06 января 2020

Я пытаюсь подсчитать данные в потоке с другим размером окна (размер окна в данных Steam), поэтому я использую пользовательские WindowAssigner и AggregateFunction, но состояние огромно (диапазон окна от одного часа до 30 дней)

На мой взгляд, агрегатное состояние - это только промежуточный результат хранения

Что-то не так?

public class ElementProcessingTime extends WindowAssigner<Element, TimeWindow> {
    @Override public Collection<TimeWindow> assignWindows(Element element, long timestamp, WindowAssignerContext context) {
        long slide = Time.seconds(10).toMilliseconds();
        long size = element.getTime() * 60 * 1000;
        timestamp = context.getCurrentProcessingTime();

        List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
        long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, 0, slide);
        for (long start = lastStart; start > timestamp - size; start -= slide) {
            windows.add(new TimeWindow(start, start + size));
        }
        return windows;
    }

    @Override public Trigger<FactorCalDetail, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return ElementTimeTrigger.create();
    }

    @Override public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
        return new TimeWindow.Serializer();
    }

    @Override public boolean isEventTime() {
        return false;
    }
}

public class CountAggregate implements AggregateFunction<FactorCalDetail, AggregateResult, AggregateResult> {

    @Override public AggregateResult createAccumulator() {
        AggregateResult result = new AggregateResult();
        result.setResult(0.0);
        return result;
    }

    @Override public AggregateResult add(FactorCalDetail value, AggregateResult accumulator) {
        accumulator.setKey(value.getGroupKey());
        accumulator.addResult();
        accumulator.setTimeSpan(value.getTimeSpan());
        return accumulator;
    }

    @Override public AggregateResult getResult(AggregateResult accumulator) {
        return accumulator;
    }

    @Override public AggregateResult merge(AggregateResult a, AggregateResult b) {
        if (a.getKey().equals(b.getKey())) {
            a.setResult(a.getResult() + b.getResult());
        }
        return a;
    }
}

env.addSource(source)
    .keyBy(Element::getKey)
    .window(new ElementProcessingTime())
    .aggregate(new CountAggregate())
    .addSink(new RedisCustomizeSink(redisProperties));

Ответы [ 2 ]

1 голос
/ 06 января 2020

Когда вы назначаете пользовательский windows, размер состояния может быстро go из-под контроля. Это главным образом потому, что каждое окно должно содержать все записи, которые попадают в него, до тех пор, пока окно не будет агрегировано и в конечном итоге не будет удалено. В вашем коде также создается впечатление, что вы создаете огромное количество windows для каждой записи.

Вы не указали свой вариант использования, но я предполагаю, что вы действительно хотите рассчитать, сколько событий растягивается в определенный момент времени для каждого ключа с размером ячейки 10 мс. Если это так, то это не является прямым примером использования windows.

. То, что вы хотите сделать:

  1. Разделить ваше событие на более мелкие события.
  2. Группировка по ключу и корзине.
  3. Подсчитайте вашу корзину.

Грубый набросок в коде:

input.flatMap(element -> {
        ...
        for (long start = lastStart; start > timestamp - size; start -= slide) {
            emit(new KeyTime(key, start));
        }
    })
    .keyBy(keyTime -> keyTime)
    .count()

Вы можете применить windows после keyBy для принудительного вызова определенных выходных свойств, таких как ожидание в течение нескольких минут, а затем вывод всего и игнорирование поздних событий.

Примечание: KeyTime - это простой POJO, содержащий ключ и время корзины.

редактировать: после вашего комментария решение на самом деле намного проще.

env.addSource(source)
    .keyBy(element -> new Tuple2<>(element.getKey(), element.getTime()))
    .count()
    .addSink(new RedisCustomizeSink(redisProperties));
0 голосов
/ 06 января 2020

Вы не говорите, что это за источник, и у него будет свое собственное состояние. Вы также не говорите, сколько есть уникальных ключей. Даже небольшое количество состояний на ключ может увеличиваться с увеличением количества уникальных ключей. Если проблема в конечном итоге приводит к росту состояния агрегатора, вы можете попытаться разбить оконную логику c на серию из двух windows: одну для агрегирования по часам, а другую для агрегирования ежечасных сводок к желаемому. временные рамки.

...