Поведение окна скользящего счетчика - PullRequest
0 голосов
/ 25 августа 2018

Предположим, у нас есть такая структура данных:

Tuple2<ArryaList<Long>, Integer>

Первое поле имеет длину ArrayList и содержит метку времени, а поле Integer представляет собой число от 1 до 40 с именем channel. Цель состоит в том, чтобы агрегировать каждые 400 сообщений с одним и тем же ключом (channel) и применять к ним ReduceFunction (он просто объединяет временные метки 400 сообщений в первом поле кортежа). Я установил поле channel в качестве ключа для сообщений и создал окно подсчета 400. Например, если в качестве входных данных у нас есть сообщение 160000 , оно должно вывести строку 160000/400 = 400, и окно Count будет работать. по желанию. Проблема в том, что когда я использую окно Sliding Count, мое ожидаемое поведение:

Flink создает логические окна для каждого channel числа и применяет ReduceFunction в первый раз, , если длина логического окна достигает 400, после этого каждые 100 входных данных с тем же ключом в качестве ключа логического окна, также вызовет ReduceFunction для последних 400 сообщений в окне, поэтому мы должны иметь:

  • 160000 - 400 = 159600 // первый вход 400 вызовет функцию уменьшения в первый раз
  • 159600 / 100 = 1596 // после первых 400 входов, для каждых 100 входов Flink вызывает функцию уменьшения для последних 400 входов
  • 1 + 1596 = 1597 // Номер выводимой строки

Но при запуске окна Sliding Count выдается 1600 строк, которые имеют переменную длину. (Я ожидал, что длина выходов будет только 400)

Точка: Говоря Длина Я имею в виду размер ArrayList (первое поле Tuple2)

  • Первые 40 каналов -> длина 100
  • Второй 40 канал -> длина 299
  • Третий канал 40 -> длина 598
  • Четвертый 40 канал -> длина 997
  • Остаток 40 канала -> длина 400

Как я могу оправдать такое поведение и реализовать желаемое окно скользящего счета?

Вот исходный код:

DataStream<Tuple2<ArrayList<Long>, Integer>> data ;
data.keyBy(1).countWindow(400, 100)
                 .reduce(new ReduceFunction<Tuple2<ArrayList<Long>, Integer>>() {
             @Override
             public Tuple2<ArrayList<Long>, Integer> reduce(Tuple2<ArrayList<Long>, Integer> t0, Tuple2<ArrayList<Long>, Integer> t1) throws Exception {
                 t0.f0.add(t1.f0.get(0));
                 return t0;
             }
         }).writeAsText("results400").setParallelism(1);

Обновление: В соответствии с предложением @DavidAnderson я также попытался создать новый кортеж в ReduceFunstion вместо изменения t0, но это привело к тому же выводу.

public Tuple2<ArrayList<Long>, Integer> reduce(Tuple2<ArrayList<Long>, Integer> t0, Tuple2<ArrayList<Long>, Integer> t1) throws Exception {
                         ArrayList<Long> times = t0.f0;

                         times.addAll(t1.f0);

                         return new Tuple2<>(times, t0.f1) ;
                     }

Ответы [ 2 ]

0 голосов
/ 25 августа 2018

Это реализация countWindow

public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {
    return window(GlobalWindows.create())
            .evictor(CountEvictor.of(size))
            .trigger(CountTrigger.of(slide));
}

, который не ведет себя так, как вы ожидаете. Окно запускается через каждые 100 элементов (слайд), независимо от того, содержит ли оно 400 элементов (размер). Максимальный размер элемента определяет количество элементов, которое нужно сохранить.

0 голосов
/ 25 августа 2018

Благодаря предложению Дэвида Андерсона , изменение ReduceFunction на следующее решает проблему.Мы должны создать новый объект в ReduceFunction:

public Tuple2<ArrayList<Long>, Integer> reduce(Tuple2<ArrayList<Long>, Integer> t0, Tuple2<ArrayList<Long>, Integer> t1) throws Exception {
                         ArrayList<Long> times = new ArrayList<>();

                         times.addAll(t0.f0);
                         times.addAll(t1.f0);


                         return new Tuple2<>(times, t0.f1) ;
                     }

Обратите внимание, что оба подхода сокращения в вопросе приводят к неправильному выводу.Теперь вывод выглядит следующим образом:

  • Первый канал 40 -> длина 100
  • Второй канал 40 -> длина 200
  • третий 40-й канал -> длина 300
  • Остатки на каждые 40 каналов -> длина 400

Таким образом, поведение окна "Flink Sliding Count" вызывает ReduceFunction каждыйвходное сообщение скользящего счета.Таким образом, в случае, если у нас есть 160000 входных сообщений, номер результата должен быть: 160000/100 = 1600

...