Есть ли в тестах Apache Flink понятие виртуального времени, как в Reactor и RxJava? - PullRequest
0 голосов
/ 24 февраля 2019

В RxJava и Reactor существует понятие виртуального времени для тестирования операторов, которые зависят от времени.Я не могу понять, как это сделать во Флинке.Например, я собрал следующий пример, где я хотел бы поиграть с опоздавшими событиями, чтобы понять, как они обрабатываются.Однако я не в состоянии понять, как будет выглядеть такой тест?Есть ли способ объединить Flink и Reactor, чтобы сделать тесты лучше?

public class PlayWithFlink {

    public static void main(String[] args) throws Exception {

        final OutputTag<MyEvent> lateOutputTag = new OutputTag<MyEvent>("late-data"){};

        // TODO understand how BoundedOutOfOrderness is related to allowedLateness
        BoundedOutOfOrdernessTimestampExtractor<MyEvent> eventTimeFunction = new BoundedOutOfOrdernessTimestampExtractor<MyEvent>(Time.seconds(10)) {
            @Override
            public long extractTimestamp(MyEvent element) {
                return element.getEventTime();
            }
        };

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStream<MyEvent> events = env.fromCollection(MyEvent.examples())
                .assignTimestampsAndWatermarks(eventTimeFunction);

        AggregateFunction<MyEvent, MyAggregate, MyAggregate> aggregateFn = new AggregateFunction<MyEvent, MyAggregate, MyAggregate>() {
            @Override
            public MyAggregate createAccumulator() {
                return new MyAggregate();
            }

            @Override
            public MyAggregate add(MyEvent myEvent, MyAggregate myAggregate) {
                if (myEvent.getTracingId().equals("trace1")) {
                    myAggregate.getTrace1().add(myEvent);
                    return myAggregate;
                }
                myAggregate.getTrace2().add(myEvent);
                return myAggregate;
            }

            @Override
            public MyAggregate getResult(MyAggregate myAggregate) {
                return myAggregate;
            }

            @Override
            public MyAggregate merge(MyAggregate myAggregate, MyAggregate acc1) {
                acc1.getTrace1().addAll(myAggregate.getTrace1());
                acc1.getTrace2().addAll(myAggregate.getTrace2());
                return acc1;
            }
        };

        KeySelector<MyEvent, String> keyFn = new KeySelector<MyEvent, String>() {
            @Override
            public String getKey(MyEvent myEvent) throws Exception {
                return myEvent.getTracingId();
            }
        };

        SingleOutputStreamOperator<MyAggregate> result = events
                .keyBy(keyFn)
                .window(EventTimeSessionWindows.withGap(Time.seconds(10)))
                .allowedLateness(Time.seconds(20))
                .sideOutputLateData(lateOutputTag)
                .aggregate(aggregateFn);


        DataStream lateStream = result.getSideOutput(lateOutputTag);

        result.print("SessionData");

        lateStream.print("LateData");

        env.execute();
    }
}

class MyEvent {
    private final String tracingId;
    private final Integer count;
    private final long eventTime;

    public MyEvent(String tracingId, Integer count, long eventTime) {
        this.tracingId = tracingId;
        this.count = count;
        this.eventTime = eventTime;
    }

    public String getTracingId() {
        return tracingId;
    }

    public Integer getCount() {
        return count;
    }

    public long getEventTime() {
        return eventTime;
    }

    public static List<MyEvent> examples() {
        long now = System.currentTimeMillis();
        MyEvent e1 = new MyEvent("trace1", 1, now);
        MyEvent e2 = new MyEvent("trace2", 1, now);
        MyEvent e3 = new MyEvent("trace2", 1, now - 1000);
        MyEvent e4 = new MyEvent("trace1", 1, now - 200);
        MyEvent e5 = new MyEvent("trace1", 1, now - 50000);
        return Arrays.asList(e1,e2,e3,e4, e5);
    }

    @Override
    public String toString() {
        return "MyEvent{" +
                "tracingId='" + tracingId + '\'' +
                ", count=" + count +
                ", eventTime=" + eventTime +
                '}';
    }
}

class MyAggregate {
    private final List<MyEvent> trace1 = new ArrayList<>();
    private final List<MyEvent> trace2 = new ArrayList<>();


    public List<MyEvent> getTrace1() {
        return trace1;
    }

    public List<MyEvent> getTrace2() {
        return trace2;
    }

    @Override
    public String toString() {
        return "MyAggregate{" +
                "trace1=" + trace1 +
                ", trace2=" + trace2 +
                '}';
    }
}

Результат выполнения этого:

SessionData:1> MyAggregate{trace1=[], trace2=[MyEvent{tracingId='trace2', count=1, eventTime=1551034666081}, MyEvent{tracingId='trace2', count=1, eventTime=1551034665081}]}
SessionData:3> MyAggregate{trace1=[MyEvent{tracingId='trace1', count=1, eventTime=1551034166081}], trace2=[]}
SessionData:3> MyAggregate{trace1=[MyEvent{tracingId='trace1', count=1, eventTime=1551034666081}, MyEvent{tracingId='trace1', count=1, eventTime=1551034665881}], trace2=[]}

Однако я ожидаю увидеть триггер lateStream дляe5 событие, которое должно быть за 50 секунд до срабатывания первого события.

1 Ответ

0 голосов
/ 25 февраля 2019

Если вы измените свой присваиватель водяных знаков таким образом

AssignerWithPunctuatedWatermarks eventTimeFunction = new AssignerWithPunctuatedWatermarks<MyEvent>() {
    long maxTs = 0;

    @Override
    public long extractTimestamp(MyEvent myEvent, long l) {
        long ts = myEvent.getEventTime();
        if (ts > maxTs) {
            maxTs = ts;
        }
        return ts;
    }

    @Override
    public Watermark checkAndGetNextWatermark(MyEvent event, long extractedTimestamp) {
        return new Watermark(maxTs - 10000);
    }
};

, тогда вы получите ожидаемые результаты.Я не рекомендую это - просто использую его, чтобы проиллюстрировать, что происходит.

Здесь происходит то, что BoundedOutOfOrdernessTimestampExtractor - это периодический генератор водяных знаков, который будет вставлять водяной знак в поток каждые 200 мсек (по умолчанию).Поскольку ваша работа завершается задолго до этого, единственным водяным знаком, с которым сталкивается ваша работа, является тот, который Флинк вставляет в конце каждого конечного потока (со значением MAX_WATERMARK).Задержка относится к водяным знакам, и событие, которое вы ожидали опоздать, успевает наступить до этого водяного знака.

Переключаясь на пунктуированные водяные знаки, вы можете заставить водяные знаки появляться чаще или точнее в определенных точках впоток.Как правило, в этом нет необходимости (и слишком частые водяные знаки приводят к накладным расходам), но это полезно, если вы хотите иметь строгий контроль над последовательностью водяных знаков.

Что касается написания тестов, вы можете взглянуть на тестовые наборы , используемые в собственных тестах Флинка, или в flink-spector .

Обновление:

Интервал времени, связанный с BoundedOutOfOrdernessTimestampExtractor, является спецификациейнасколько ожидается выход из строя потока.События, поступающие в пределах этой границы, не считаются запоздалыми, и таймеры времени событий не будут срабатывать до тех пор, пока не истечет эта задержка, что дает время для поступления событий, вышедших из строя.allowLateness применяется только к оконному API и описывает, как долго по истечении обычного времени срабатывания окна каркас сохраняет состояние окна, так что события все еще могут быть добавлены в окно и вызывать поздние срабатывания.По истечении этого дополнительного интервала состояние окна очищается, и последующие события отправляются на боковой выход (если настроено).

enter image description here

Поэтому, когда вы используете BoundedOutOfOrdernessTimestampExtractor<MyEvent>(Time.seconds(10)), вы не говорите «подождите 10 секунд после каждого события на случай, если более ранние события могутеще приеду ".Но вы говорите, что ваши события должны быть не более 10 секунд не в порядке.Таким образом, если вы обрабатываете поток событий в реальном времени, это означает, что вы будете ждать не более 10 секунд в случае поступления более ранних событий.(А если вы обрабатываете исторические данные, то вы можете обрабатывать 10 секунд данных за 1 секунду или нет - знание того, что вы будете ждать n секунд времени передачи события, ничего не говорит о том, сколько на самом деле это займет времени.)

Подробнее об этом см. Время события и водяные знаки .

...