Тестирование окна Flink - PullRequest
       4

Тестирование окна Flink

0 голосов
/ 26 февраля 2020

У меня есть простое приложение Flink, которое суммирует события с тем же идентификатором и отметкой времени за последнюю минуту:

DataStream<String> input = env
                .addSource(consumerProps)
                .uid("app");

DataStream<Pixel> pixels = input.map(record -> mapper.readValue(record, Pixel.class));

pixels
        .keyBy("id", "timestampRoundedToMinutes")
        .timeWindow(Time.minutes(1))
        .sum("constant")
        .addSink(dynamoDBSink);

env.execute(jobName);

Я пытаюсь протестировать это приложение с рекомендуемым подходом в документация . Я также рассмотрел этот вопрос о стекопереработке , но добавление приемника не помогло.

У меня есть @ClassRule, как рекомендовано в моем тестовом классе. Функция выглядит следующим образом:

StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);

CollectSink.values.clear();

Pixel testPixel1 = Pixel.builder().id(1).timestampRoundedToMinutes("202002261219").constant(1).build();
Pixel testPixel2 = Pixel.builder().id(2).timestampRoundedToMinutes("202002261220").constant(1).build();
Pixel testPixel3 = Pixel.builder().id(1).timestampRoundedToMinutes("202002261219").constant(1).build();
Pixel testPixel4 = Pixel.builder().id(3).timestampRoundedToMinutes("202002261220").constant(1).build();

env.fromElements(testPixel1, testPixel2, testPixel3, testPixel4)
    .keyBy("id","timestampRoundedToMinutes")
    .timeWindow(Time.minutes(1))
    .sum("constant")
    .addSink(new CollectSink());

JobExecutionResult result = env.execute("AggregationTest");
assertNotEquals(0, CollectSink.values.size());

CollectSink скопирован из документации .

Что я делаю не так? Есть также простой способ протестировать приложение со встроенной кафкой?

Спасибо!

1 Ответ

3 голосов
/ 26 февраля 2020

Причина, по которой ваш тест не пройден, заключается в том, что окно никогда не запускается. Задание завершается до того, как окно достигает конца выделенного времени.

Причина этого связана с тем, как вы работаете со временем. Указывая

.keyBy("id","timestampRoundedToMinutes")

, вы организуете для всех событий для одного и того же идентификатора и с отметками времени в течение одной минуты, чтобы они были в одном и том же окне. Но поскольку вы используете управление временем обработки (а не окно времени событий), ваш windows не закроется, пока время дня, когда выполняется тест, не пересекает границу от одной минуты до следующей. Поскольку для обработки требуется всего четыре события, ваша работа вряд ли будет работать достаточно долго, чтобы это произошло.

Вместо этого вам нужно сделать что-то вроде этого: установите для характеристики времени c время события и обеспечить извлечение метки времени и присваивание водяного знака. Обратите внимание, что при этом нет необходимости вводить метку времени, округленную до минутных границ - это часть того, что время события windows делает в любом случае.

public static void main(String[] args) throws Exception {
    ...

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    env.fromElements(testPixel1, testPixel2, testPixel3, testPixel4)
        .assignTimestampsAndWatermarks(new TimestampsAndWatermarks())
        .keyBy("id")
        .timeWindow(Time.minutes(1))
        .sum("constant")
        .addSink(new CollectSink());

    env.execute();
}

private static class TimestampsAndWatermarks extends BoundedOutOfOrdernessTimestampExtractor<Event> {
    public TimestampsAndWatermarks() {
        super(/* delay to handle out-of-orderness */);
    }

    @Override
    public long extractTimestamp(Event event) {
        return event.timestamp;
    }
}

См. Документацию и учебные пособия для получения дополнительной информации о времени события, водяных знаках и управлении окнами.

...