Может ли Flink производить ежечасные снимки агрегированных / скользящих / накопленных данных? - PullRequest
0 голосов
/ 16 декабря 2018

Пример обработки потока в учебнике - программа подсчета слов с меткой времени.Со следующей выборкой данных

mario 10:00
luigi 10:01
mario 11:00
mario 12:00

я видел программы подсчета слов, созданные за:

Общий набор данных

mario 3
luigi 1

Набор разделов временного окна

mario 10:00-11:00 1
luigi 10:00-11:00 1
mario 11:00-12:00 1
mario 12:00-13:00 1

Я не нашел, однако, пример программы подсчета слов по скользящему временному окну, т.е. я хотел бы, чтобы подсчет слов производился ежечаснодля каждого слова с начала времени:

mario 10:00-11:00 1
luigi 10:00-11:00 1
mario 11:00-12:00 2
luigi 11:00-12:00 1
mario 12:00-13:00 3
luigi 12:00-13:00 1

Возможно ли это с Apache Flink или любой другой библиотекой обработки потоков?Спасибо!

edit:

До сих пор я пробовал вариант подхода Дэвида Андерсона, меняя только время обработки для времени события, поскольку данные имеют временную выборку.Это не работает, как я ожидал, хотя.Вот код, пример данных, результаты, которые он предоставляет, и мои дополнительные вопросы:

public static void main(String[] args) throws Exception {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
            .setParallelism(1)
            .setMaxParallelism(1);

    env.setStreamTimeCharacteristic(EventTime);


    String fileLocation = "full file path here";
    DataStreamSource<String> rawInput = env.readFile(new TextInputFormat(new Path(fileLocation)), fileLocation);

    rawInput.flatMap(parse())
            .assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<TimestampedWord>() {
                @Nullable
                @Override
                public Watermark checkAndGetNextWatermark(TimestampedWord lastElement, long extractedTimestamp) {
                    return new Watermark(extractedTimestamp - 1);
                }

                @Override
                public long extractTimestamp(TimestampedWord element, long previousElementTimestamp) {
                    return element.getTimestamp();
                }
            })
            .keyBy(TimestampedWord::getWord)
            .process(new KeyedProcessFunction<String, TimestampedWord, Tuple3<String, Long, Long>>() {
                private transient ValueState<Long> count;

                @Override
                public void open(Configuration parameters) throws Exception {
                    count = getRuntimeContext().getState(new ValueStateDescriptor<>("counter", Long.class));
                }

                @Override
                public void processElement(TimestampedWord value, Context ctx, Collector<Tuple3<String, Long, Long>> out) throws Exception {
                    if (count.value() == null) {
                        count.update(0L);
                    }

                    long l = ((value.getTimestamp() / 10) + 1) * 10;
                    ctx.timerService().registerEventTimeTimer(l);

                    count.update(count.value() + 1);
                }

                @Override
                public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple3<String, Long, Long>> out) throws Exception {
                    long currentWatermark = ctx.timerService().currentWatermark();
                    out.collect(new Tuple3(ctx.getCurrentKey(), count.value(), currentWatermark));
                }
            })
            .addSink(new PrintlnSink());

    env.execute();
}

private static long fileCounter = 0;

private static FlatMapFunction<String, TimestampedWord> parse() {
    return new FlatMapFunction<String, TimestampedWord>() {
        @Override
        public void flatMap(String value, Collector<TimestampedWord> out) {
            out.collect(new TimestampedWord(value, fileCounter++));
        }
    };
}

private static class TimestampedWord {
    private final String word;
    private final long timestamp;

    private TimestampedWord(String word, long timestamp) {
        this.word = word;
        this.timestamp = timestamp;
    }

    public String getWord() {
        return word;
    }

    public long getTimestamp() {
        return timestamp;
    }
}

private static class PrintlnSink implements org.apache.flink.streaming.api.functions.sink.SinkFunction<Tuple3<String, Long, Long>> {
    @Override
    public void invoke(Tuple3<String, Long, Long> value, Context context) throws Exception {
        System.out.println(value.getField(0) + "=" + value.getField(1) + " at " + value.getField(2));
    }
}

С файлом со следующими словами, каждое в новой строке:

mario,луиджи, Марио, Марио, Вилма, Фреда, боб, боб, Марио, дан, Дилан, Дилан, Фреда, Марио, Марио, деревенщина, BamBam, лето, анна, анна, Эда, анна, анна, анна, анна, анна

Создает следующий вывод:

mario=4 at 10
luigi=1 at 10
dan=1 at 10
bob=2 at 10
fred=1 at 10
vilma=1 at 10
dylan=2 at 20
fred=2 at 20
carl=1 at 20
anna=3 at 20
summer=1 at 20
bambam=1 at 20
mario=6 at 20
anna=7 at 9223372036854775807
edu=1 at 9223372036854775807

Что-то явно не так.Я получаю счет 3 для anna в 20, хотя третий экземпляр слова anna не появляется до позиции 22. Как ни странно edu появляется только в последнем снимке, даже если он появился раньшеanna с третьим экземпляром.Как я могу вызвать создание снимка каждые 10 «единиц времени», даже если не приходят сообщения (т. Е. Должны быть получены те же данные)?

Если бы кто-нибудь мог указать мне правильное направление, я был бы оченьблагодарны!

1 Ответ

0 голосов
/ 16 декабря 2018

Да, это не только возможно сделать с Flink, но это легко.Вы можете сделать это с помощью KeyedProcessFunction, которая поддерживает счетчик в состоянии ключа для количества раз, которое каждое слово / ключ появилось до сих пор во входном потоке.Затем используйте таймер для запуска отчетов.

Вот пример, в котором используются таймеры обработки.Он печатает отчет каждые 10 секунд.

public class DSExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment();

        env.addSource(new SocketTextStreamFunction("localhost", 9999, "\n", -1))
            .keyBy(x -> x)
            .process(new KeyedProcessFunction<String, String, Tuple3<Long, String, Integer>>() {
                private transient ValueState<Integer> counter;

                @Override
                public void open(Configuration parameters) throws Exception {
                    counter = getRuntimeContext().getState(new ValueStateDescriptor<>("counter", Integer.class));
                }

                @Override
                public void processElement(String s, Context context, Collector<Tuple3<Long, String, Integer>> collector) throws Exception {
                    if (counter.value() == null) {
                        counter.update(0);
                        long now = context.timerService().currentProcessingTime();
                        context.timerService().registerProcessingTimeTimer((now + 10000) - (now % 10000));
                    }
                    counter.update(counter.value() + 1);
                }

                @Override
                public void onTimer(long timestamp, OnTimerContext context, Collector<Tuple3<Long, String, Integer>> out) throws Exception {
                    long now = context.timerService().currentProcessingTime();
                    context.timerService().registerProcessingTimeTimer((now + 10000) - (now % 10000));
                    out.collect(new Tuple3(now, context.getCurrentKey(), counter.value()));
                }
            })
            .print();

        env.execute();
    }
}

Обновлено:

Всегда лучше использовать время события, но это добавляет сложности.Большая часть дополнительной сложности связана с тем фактом, что в реальных приложениях вам, скорее всего, придется иметь дело с событиями не по порядку, которых вы избежали в своем примере, поэтому в этом случае мы можем сойти с рук довольно простымреализация.

Если вы измените две вещи, вы получите ожидаемые результаты.Во-первых, установка водяных знаков на extractedTimestamp - 1 является причиной неправильных результатов (например, вот почему anna = 3 на 20).Если вместо этого установить водяной знак на extractedTimestamp, эта проблема исчезнет.

Объяснение: Именно появление третьей анны создает водяной знак, который закрывает окно в момент времени 20. Третья анна имеетвременная метка 21, и поэтому в потоке за ней следует водяной знак 20, который закрывает второе окно и создает отчет, говорящий anna = 3.Да, первый edu прибыл раньше, но это был первый edu с отметкой времени 20. В момент поступления edu таймер для edu не установлен, а созданный таймер правильно настроен на 30, поэтому мыне слышите об edu до тех пор, пока не появится Водяной знак, по крайней мере, 30.

Другая проблема - логика таймера.Flink создает отдельный таймер для каждой клавиши, и вам нужно создавать новый таймер каждый раз, когда срабатывает таймер.В противном случае вы будете получать отчеты только о словах, которые поступили во время окна.Вы должны изменить код так, чтобы он выглядел так:

@Override
public void processElement(TimestampedWord value, Context ctx, Collector<Tuple3<String, Long, Long>> out) throws Exception {
    if (count.value() == null) {
        count.update(0L);
        setTimer(ctx.timerService(), value.getTimestamp());
    }

    count.update(count.value() + 1);
}

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple3<String, Long, Long>> out) throws Exception {
    long currentWatermark = ctx.timerService().currentWatermark();
    out.collect(new Tuple3(ctx.getCurrentKey(), count.value(), currentWatermark));
    if (currentWatermark < Long.MAX_VALUE) {
        setTimer(ctx.timerService(), currentWatermark);
    }
}

private void setTimer(TimerService service, long t) {
    service.registerEventTimeTimer(((t / 10) + 1) * 10);
}

С этими изменениями я получаю следующие результаты:

mario=4 at 10
luigi=1 at 10
fred=1 at 10
bob=2 at 10
vilma=1 at 10
dan=1 at 10
vilma=1 at 20
luigi=1 at 20
dylan=2 at 20
carl=1 at 20
bambam=1 at 20
mario=6 at 20
summer=1 at 20
anna=2 at 20
bob=2 at 20
fred=2 at 20
dan=1 at 20
fred=2 at 9223372036854775807
dan=1 at 9223372036854775807
carl=1 at 9223372036854775807
dylan=2 at 9223372036854775807
vilma=1 at 9223372036854775807
edu=1 at 9223372036854775807
anna=7 at 9223372036854775807
summer=1 at 9223372036854775807
bambam=1 at 9223372036854775807
luigi=1 at 9223372036854775807
bob=2 at 9223372036854775807
mario=6 at 9223372036854775807

Теперь, если вам нужно было фактически обработатьпорядок событий, это будет немного сложнее.Было бы необходимо, чтобы водяные знаки отставали от временных отметок на некоторую реалистичную величину, отражающую фактическую величину неупорядоченности, присутствующей в потоке, что затем потребовало бы возможности обрабатывать, если открыто более одного окна одновременно.Любое данное событие / слово может не принадлежать окну, которое будет закрываться следующим, и поэтому не должно увеличивать свой счетчик.Например, вы можете буферизовать эти «ранние» события в другом фрагменте состояния (например, ListState) или каким-либо образом поддерживать несколько счетчиков (возможно, в MapState).Кроме того, некоторые события могут быть запоздалыми, что делает недействительными более ранние отчеты, и вы захотите определить политику для их обработки.

...