Кто-нибудь знает, как проверить оконные функции в Flink
?Я использую зависимость flink-test-utils_2.11
.
Мои шаги:
- Получить
StreamExecutionEnvironment
- Создать объекты и добавить в среду
- Сделать
keyBy
- , добавить окно сеанса
- , выполнить агрегатную функцию
public class AggregateVariantCEVTest extends AbstractTestBase {
@Test
public void testAggregateVariantCev() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.fromElements(objectOne, objectTwo)
.keyBy(new KeyedByMyCustomKey())
.window(EventTimeSessionWindows.withGap(Time.seconds(1)))
.aggregate(new MyAgreggateFunction());
JobExecutionResult result = env.execute();
assertEquals(myExpectedResults, result.getAllAccumulatorResults());
}
}
Проблема в том, что размер result.getAllAccumulatorResults()
равен 0.
Есть идеи, что я делаю не так?Заранее спасибо!