Да, это не только возможно сделать с Flink, но это легко.Вы можете сделать это с помощью KeyedProcessFunction, которая поддерживает счетчик в состоянии ключа для количества раз, которое каждое слово / ключ появилось до сих пор во входном потоке.Затем используйте таймер для запуска отчетов.
Вот пример, в котором используются таймеры обработки.Он печатает отчет каждые 10 секунд.
public class DSExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new SocketTextStreamFunction("localhost", 9999, "\n", -1))
.keyBy(x -> x)
.process(new KeyedProcessFunction<String, String, Tuple3<Long, String, Integer>>() {
private transient ValueState<Integer> counter;
@Override
public void open(Configuration parameters) throws Exception {
counter = getRuntimeContext().getState(new ValueStateDescriptor<>("counter", Integer.class));
}
@Override
public void processElement(String s, Context context, Collector<Tuple3<Long, String, Integer>> collector) throws Exception {
if (counter.value() == null) {
counter.update(0);
long now = context.timerService().currentProcessingTime();
context.timerService().registerProcessingTimeTimer((now + 10000) - (now % 10000));
}
counter.update(counter.value() + 1);
}
@Override
public void onTimer(long timestamp, OnTimerContext context, Collector<Tuple3<Long, String, Integer>> out) throws Exception {
long now = context.timerService().currentProcessingTime();
context.timerService().registerProcessingTimeTimer((now + 10000) - (now % 10000));
out.collect(new Tuple3(now, context.getCurrentKey(), counter.value()));
}
})
.print();
env.execute();
}
}
Обновлено:
Всегда лучше использовать время события, но это добавляет сложности.Большая часть дополнительной сложности связана с тем фактом, что в реальных приложениях вам, скорее всего, придется иметь дело с событиями не по порядку, которых вы избежали в своем примере, поэтому в этом случае мы можем сойти с рук довольно простымреализация.
Если вы измените две вещи, вы получите ожидаемые результаты.Во-первых, установка водяных знаков на extractedTimestamp - 1
является причиной неправильных результатов (например, вот почему anna = 3 на 20).Если вместо этого установить водяной знак на extractedTimestamp
, эта проблема исчезнет.
Объяснение: Именно появление третьей анны создает водяной знак, который закрывает окно в момент времени 20. Третья анна имеетвременная метка 21, и поэтому в потоке за ней следует водяной знак 20, который закрывает второе окно и создает отчет, говорящий anna = 3.Да, первый edu прибыл раньше, но это был первый edu с отметкой времени 20. В момент поступления edu таймер для edu не установлен, а созданный таймер правильно настроен на 30, поэтому мыне слышите об edu до тех пор, пока не появится Водяной знак, по крайней мере, 30.
Другая проблема - логика таймера.Flink создает отдельный таймер для каждой клавиши, и вам нужно создавать новый таймер каждый раз, когда срабатывает таймер.В противном случае вы будете получать отчеты только о словах, которые поступили во время окна.Вы должны изменить код так, чтобы он выглядел так:
@Override
public void processElement(TimestampedWord value, Context ctx, Collector<Tuple3<String, Long, Long>> out) throws Exception {
if (count.value() == null) {
count.update(0L);
setTimer(ctx.timerService(), value.getTimestamp());
}
count.update(count.value() + 1);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple3<String, Long, Long>> out) throws Exception {
long currentWatermark = ctx.timerService().currentWatermark();
out.collect(new Tuple3(ctx.getCurrentKey(), count.value(), currentWatermark));
if (currentWatermark < Long.MAX_VALUE) {
setTimer(ctx.timerService(), currentWatermark);
}
}
private void setTimer(TimerService service, long t) {
service.registerEventTimeTimer(((t / 10) + 1) * 10);
}
С этими изменениями я получаю следующие результаты:
mario=4 at 10
luigi=1 at 10
fred=1 at 10
bob=2 at 10
vilma=1 at 10
dan=1 at 10
vilma=1 at 20
luigi=1 at 20
dylan=2 at 20
carl=1 at 20
bambam=1 at 20
mario=6 at 20
summer=1 at 20
anna=2 at 20
bob=2 at 20
fred=2 at 20
dan=1 at 20
fred=2 at 9223372036854775807
dan=1 at 9223372036854775807
carl=1 at 9223372036854775807
dylan=2 at 9223372036854775807
vilma=1 at 9223372036854775807
edu=1 at 9223372036854775807
anna=7 at 9223372036854775807
summer=1 at 9223372036854775807
bambam=1 at 9223372036854775807
luigi=1 at 9223372036854775807
bob=2 at 9223372036854775807
mario=6 at 9223372036854775807
Теперь, если вам нужно было фактически обработатьпорядок событий, это будет немного сложнее.Было бы необходимо, чтобы водяные знаки отставали от временных отметок на некоторую реалистичную величину, отражающую фактическую величину неупорядоченности, присутствующей в потоке, что затем потребовало бы возможности обрабатывать, если открыто более одного окна одновременно.Любое данное событие / слово может не принадлежать окну, которое будет закрываться следующим, и поэтому не должно увеличивать свой счетчик.Например, вы можете буферизовать эти «ранние» события в другом фрагменте состояния (например, ListState) или каким-либо образом поддерживать несколько счетчиков (возможно, в MapState).Кроме того, некоторые события могут быть запоздалыми, что делает недействительными более ранние отчеты, и вы захотите определить политику для их обработки.