Apche Flink - Время события - PullRequest
       6

Apche Flink - Время события

0 голосов
/ 03 сентября 2018

Я хочу создать часы времени для моих событий в Apache flink. Я делаю это следующим образом

public class TimeStampAssigner implements AssignerWithPeriodicWatermarks<Tuple2<String, String>> {


    private final long maxOutOfOrderness = 0; // 3.5 

    private long currentMaxTimestamp;

    @Override
    public long extractTimestamp(Tuple2<String, String> element, long previousElementTimestamp) {

        currentMaxTimestamp = new  Date().getTime();

        return currentMaxTimestamp;
    }



    @Override
    public Watermark getCurrentWatermark() {

        return new Watermark(currentMaxTimestamp - maxOutOfOrderness);


    }

}

Пожалуйста, проверьте приведенный выше код и скажите, правильно ли я это делаю. По истечении времени события и назначения водяного знака я хочу обработать поток в функции процесса, в которой я буду собирать данные потока в течение 10 минут для разных ключей.

1 Ответ

0 голосов
/ 03 сентября 2018

Нет, это не подходящая реализация. Временная метка времени события должна быть детерминированной (то есть воспроизводимой), и она должна основываться на данных в потоке событий. Если вместо этого вы собираетесь использовать Date (). GetTime, то вы более или менее используете время обработки.

Обычно при обработке времени события ваши события будут иметь поле метки времени, а экстрактор метки времени будет возвращать значение этого поля.

Реализация, которую вы показали, потеряет большинство преимуществ, связанных с работой со временем события, например, возможность повторной обработки исторических данных для воспроизведения исторических результатов.

...