Записать модульный тест для утверждения экземпляра функции flink можно сериализуемо - PullRequest
0 голосов
/ 18 июня 2019

У меня было это исключение во время выполнения:

org.apache.flink.api.common.InvalidProgramException: The implementation of the RichFlatMapFunction is not serializable. The object probably contains or references non serializable fields. 

Хотя я понимаю, что происходит и знаю, как это исправить, я хотел бы убедиться, что это больше не повторится.Когда кто-то добавляет непериализуемые поля к этому классу RichFlatMapFunction, я хотел бы, чтобы модульный тест не проходил, а не имел сбой во время выполнения.

Есть ли способ написать модульный тест, который бы утверждал, что функциясериализуемый, используя тот же код сериализации функции, что и flink?

1 Ответ

0 голосов
/ 18 июня 2019

Для этого сценария используйте Интеграционный тест .

В следующем коде строка env.execute(); будет запускать конвейер и сериализовать операторы MultiplyByTwo и CollectSink.

Вы можете использовать его таким же образом, чтобы проверить, сериализуемо ли RichFlatMapFunction.

public class ExampleIntegrationTest extends AbstractTestBase {

    @Test
    public void testMultiply() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // configure your test environment
        env.setParallelism(1);

        // values are collected in a static variable
        CollectSink.values.clear();

        // create a stream of custom elements and apply transformations
        env.fromElements(1L, 21L, 22L)
                .map(new MultiplyByTwo())
                .addSink(new CollectSink());

        // execute
        env.execute();

        // verify your results
        assertEquals(Lists.newArrayList(2L, 42L, 44L), CollectSink.values);
    }

    // create a testing sink
    private static class CollectSink implements SinkFunction<Long> {

        // must be static
        public static final List<Long> values = new ArrayList<>();

        @Override
        public synchronized void invoke(Long value) throws Exception {
            values.add(value);
        }
    }
}

Ссылка: https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#integration-testing

...