Flink окно сеанса с получением результата в конце - PullRequest
0 голосов
/ 05 апреля 2020

У меня есть сообщения kafka, похожие на следующий шаблон:

{ user: 'someUser', value: 'SomeValue' , timestamp:000000000}

С вычислением потока Flink, который выполняет некоторые действия по подсчету этих элементов.

Теперь я хотите объявить сеанс, собрать одно и то же значение user + в диапазоне X секунд как единое целое с последней отметкой времени, затем он будет перенаправлен в следующий поток только один раз

Так что я написал что-то вроде что:

data.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Data>() {
        .....
    })
    .keyBy(new KeySelector<Data, String>(){

        .......
    })
    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    .aggregate(new AggregateFunction<Data, Data, Data>() {

        @Override
        public Data createAccumulator() {
            return null;
        }

        @Override
        public Data add(Data value, Data accumulator) {
            if(accumulator == null) {
                accumulator = value;
            }
            return accumulator;

        }

        @Override
        public Data getResult(Data accumulator) {
            return accumulator;
        }

        @Override
        public Data merge(Data a, Data b) {
            return a;
        }
   });

Но проблема в том, что функция getResult вызывается для каждого элемента, а не только в конце окна.

Моя проблема заключается в том, чтобы не пересылать агрегацию результат до конца окна до следующего потока. насколько я знаю, также результат процесса потока движется вперед, когда больше нет элементов, даже если windows не конец да

любой совет?

Спасибо

1 Ответ

0 голосов
/ 05 апреля 2020

Flink предлагает два различных подхода к оценке windows. В этом случае вы хотите использовать другой.

Один подход оценивает содержимое каждого окна постепенно. Это то, что вы получаете с reduce и aggregate. Поскольку элементы назначаются окну, вызывается ReduceFunction или AggregateFunction, и этот элемент немедленно вносит свой вклад в конечный результат.

Альтернативой является использование process с ProcessWindowFunction. При таком подходе окно не оценивается до тех пор, пока оно не будет завершено, и в этот момент ProcessWindowFunction вызывается один раз с Iterable, содержащим все элементы, которые были назначены окну. Недостаток этого метода заключается в необходимости сохранять все элементы до тех пор, пока не будет запущено окно, и если ProcessWindowFunction нужно проделать большую работу, чтобы вычислить его результат, который может временно нарушить конвейер, но для этого необходимо выполнить некоторые вычисления. способ подсчета различных элементов.

Подробнее см. документацию .

...