Как получить новейшую запись окна обработки минут - PullRequest
0 голосов
/ 23 октября 2018

У меня есть потоковое задание с использованием windows .

Моя цель - сгруппировать записи по их внутренним id, полученным в течение минуты, и передать только самую новую запись за id.

Я выяснил 2 возможных подхода:

  1. Использование reduce()

    stream.keyBy(Record::getId)
        .window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
        .reduce((rec1, rec2) -> rec2);
    

    Это прекрасно работает, но кажется расточительнымтребуется каждая и каждая запись.

  2. Использование process()

    stream.keyBy(Record::getId)
        .window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
        .process(new ProcessWindowFunction<Record, Object, Long, TimeWindow>() {
            @Override
            public void process(Long aLong, Context context, Iterable<Record> iterable, Collector<Object> collector) throws Exception {
                Record last = null;
                for (Record rec : iterable) {
                    if (last == null || last.getTimestamp() < rec.getTimestamp()) {
                        last = rec;
                    }
                }
                collector.collect(last);
            }
        });
    

Это также работает правильно.Я ожидал, что это будет быстрее, но это , а не (это примерно то же, что и решение 1.).

Можете ли вы порекомендовать лучший подход?

1 Ответ

0 голосов
/ 24 октября 2018

Ваше решение 1. кажется, лучший подход здесь.

Относительно вашего комментария:

Это прекрасно работает, но кажется расточительным, так как вызывается для каждой записи.

Проблема в том, что вы надеваетеНе знаю, какая запись будет последней.Следовательно, вам всегда нужно хранить последнюю просмотренную запись.Поскольку результат ReduceFunction сохраняется в состоянии (либо для следующей оценки метода, либо для возврата его в качестве результата), это именно то, что здесь происходит.

Ваше решение 2. на самом деле меньшеэффективный (с точки зрения хранения / памяти).Он запоминает все записи, поступившие в течение минуты, и перебирает все, когда оценивается окно.В отличие от этого, решение 1. хранит только одно значение (результат последней оценки функции).

Вы можете реализовать решение с обычным ProcessFunction и таймером, однако я не думаю, чтоэто будет значительно быстрее, чем окно + ReduceFunction.Более того, для этого потребуется намного больше кода.

...