Предположим, у нас есть такая структура данных:
Tuple2<ArryaList<Long>, Integer>
Первое поле имеет длину ArrayList
и содержит метку времени, а поле Integer представляет собой число от 1 до 40 с именем channel
. Цель состоит в том, чтобы агрегировать каждые 400 сообщений с одним и тем же ключом (channel
) и применять к ним ReduceFunction
(он просто объединяет временные метки 400 сообщений в первом поле кортежа).
Я установил поле channel
в качестве ключа для сообщений и создал окно подсчета 400. Например, если в качестве входных данных у нас есть сообщение 160000 , оно должно вывести строку 160000/400 = 400
, и окно Count будет работать. по желанию. Проблема в том, что когда я использую окно Sliding Count, мое ожидаемое поведение:
Flink создает логические окна для каждого channel
числа и применяет ReduceFunction
в первый раз, , если длина логического окна достигает 400, после этого каждые 100 входных данных с тем же ключом в качестве ключа логического окна, также вызовет ReduceFunction
для последних 400 сообщений в окне, поэтому мы должны иметь:
160000 - 400 = 159600
// первый вход 400 вызовет функцию уменьшения в первый раз
159600 / 100 = 1596
// после первых 400 входов, для каждых 100 входов Flink вызывает функцию уменьшения для последних 400 входов
1 + 1596 = 1597
// Номер выводимой строки
Но при запуске окна Sliding Count выдается 1600 строк, которые имеют переменную длину. (Я ожидал, что длина выходов будет только 400)
Точка: Говоря Длина Я имею в виду размер ArrayList (первое поле Tuple2)
- Первые 40 каналов -> длина 100
- Второй 40 канал -> длина 299
- Третий канал 40 -> длина 598
- Четвертый 40 канал -> длина 997
- Остаток 40 канала -> длина 400
Как я могу оправдать такое поведение и реализовать желаемое окно скользящего счета?
Вот исходный код:
DataStream<Tuple2<ArrayList<Long>, Integer>> data ;
data.keyBy(1).countWindow(400, 100)
.reduce(new ReduceFunction<Tuple2<ArrayList<Long>, Integer>>() {
@Override
public Tuple2<ArrayList<Long>, Integer> reduce(Tuple2<ArrayList<Long>, Integer> t0, Tuple2<ArrayList<Long>, Integer> t1) throws Exception {
t0.f0.add(t1.f0.get(0));
return t0;
}
}).writeAsText("results400").setParallelism(1);
Обновление: В соответствии с предложением @DavidAnderson я также попытался создать новый кортеж в ReduceFunstion
вместо изменения t0
, но это привело к тому же выводу.
public Tuple2<ArrayList<Long>, Integer> reduce(Tuple2<ArrayList<Long>, Integer> t0, Tuple2<ArrayList<Long>, Integer> t1) throws Exception {
ArrayList<Long> times = t0.f0;
times.addAll(t1.f0);
return new Tuple2<>(times, t0.f1) ;
}