Как написать тестовый код Junit для Flink CEP с потребителем Kafka - PullRequest
0 голосов
/ 02 января 2019

У нас есть метод execute (), в котором мы используем FlinkKafkaConsumer08 в качестве источника Flink CEP, затем у нас есть шаблон CEP, и оповещение снова передается в другой теме кафки.Как я могу написать тестовый пример junit для этого метода execute ()?Может ли кто-нибудь предоставить мне пример кода Junit для этого?

Pattern.<WebConnectionUseCase>begin("start")
                .where(new SimpleCondition<WebConnectionUseCase>() {
                    public boolean filter(WebConnectionUseCase event) {
                        return ((event.getValues().getPredictedAvailableMemory()
                                - event.getValues().getAvailableMemory()) >= STARTDIFF);
                    }
                }).followedBy("middle").where(new IterativeCondition<WebConnectionUseCase>() {
                    public boolean filter(WebConnectionUseCase value, Context<WebConnectionUseCase> ctx)
                            throws Exception {

                        Iterable<WebConnectionUseCase> middleStops = ctx.getEventsForPattern("middle");
                        List<Double> diffMemoryList = new ArrayList<Double>();
                        List<Double> connectionList = new ArrayList<Double>();
                        middleStops.forEach(item -> diffMemoryList.add(item.getValues().getPredictedAvailableMemory()
                                - item.getValues().getAvailableMemory()));
                        middleStops.forEach(item -> connectionList.add(item.getValues().getConnection()));
                        return checkIncreasingPattern(diffMemoryList) && checkDecreasingPattern(connectionList);
                    }

                    private boolean checkDecreasingPattern(List<Double> list) {
                        //code
                    }

                    private boolean checkIncreasingPattern(List<Double> list) {
                        // code
                    }
                }).times(PATTERNCOUNT).consecutive().next("end").where(new SimpleCondition<WebConnectionUseCase>() {
                    @Override
                    public boolean filter(WebConnectionUseCase event) {
                        return ((event.getValues().getPredictedAvailableMemory()
                                - event.getValues().getAvailableMemory()) >= ENDDIFF);
                    }
                }).within(Time.minutes(TIMEOUTDURATION));

1 Ответ

0 голосов
/ 03 января 2019

Я бы инкапсулировал часть, которую вы хотите протестировать, в объект, который вы можете подключить к специальным источникам и приемникам для тестирования, а также к действующим источникам данных / приемникам для производства.

Для тестовой раковины вы можете использовать это:

public static class TestSink<OUT> implements SinkFunction<OUT> {

    // must be static
    public static final List values = new ArrayList<>();

    @Override
    public void invoke(OUT value, Context context) throws Exception {
        values.add(value);
    }
}

Тогда ваши тесты могут сравнить значения раковин с ожидаемыми результатами.

Проще написать тесты, которые выполняют обработку времени события (по сравнению с теми, которые используют время обработки), поскольку со временем обработки результаты не являются детерминированными. И легче иметь тесты, которые выполняются с параллелизмом 1, также ради достижения детерминированных результатов.

Здесь вы найдете несколько примеров тестов .

...