Как правильно проверить функцию окна Flink? - PullRequest
0 голосов
/ 25 июня 2019

Кто-нибудь знает, как проверить оконные функции в Flink?Я использую зависимость flink-test-utils_2.11.

Мои шаги:

  1. Получить StreamExecutionEnvironment
  2. Создать объекты и добавить в среду
  3. Сделать keyBy
  4. , добавить окно сеанса
  5. , выполнить агрегатную функцию
public class AggregateVariantCEVTest extends AbstractTestBase {

   @Test
    public void testAggregateVariantCev() throws Exception  {
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       env.setParallelism(1);

       env.fromElements(objectOne, objectTwo)
               .keyBy(new KeyedByMyCustomKey())
               .window(EventTimeSessionWindows.withGap(Time.seconds(1)))
               .aggregate(new MyAgreggateFunction());


       JobExecutionResult result = env.execute();

       assertEquals(myExpectedResults, result.getAllAccumulatorResults());

   }
}

Проблема в том, что размер result.getAllAccumulatorResults() равен 0.

Есть идеи, что я делаю не так?Заранее спасибо!

Ответы [ 2 ]

1 голос
/ 26 июня 2019

Вероятно, правильным подходом здесь является использование TestHarness. Хорошим примером является WindowOperatorTest в самом проекте Flink.

Кроме того, вы можете проверить https://github.com/knaufk/flink-testing-pyramid для примеров того, как тестировать Flink Job на разных уровнях пирамиды тестирования, и документацию Flink по тестированию https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/testing.html.

1 голос
/ 25 июня 2019

Windows не помещает свои результаты в аккумуляторы. Вы должны прикрепить тестовый приемник к своей работе и затем сравнить содержимое этого приемника с тем, что вы ожидаете. Что-то вроде того, что показано в документации в разделе по интеграционному тестированию .

...