Как настроить время окна Flink по ключу - PullRequest
0 голосов
/ 21 января 2020

Различные типы элементов поступают в источник, который я делю на разные окна по типу. Теперь каждое окно type's должно быть сконфигурировано со своим собственным тайм-аутом, чем один .byKey("type").window(TumblingProcessingTimeWindows.of(Time.seconds(1))).allowedLateness(Time.seconds(5)).

. Например: {{"type-1", 5m}, {"type-2" , 10m}, {"type-3", 1m}}

Итак, элементы, добавленные в окно с ключом type-1, имеют время излучения 5 минут, аналогично type-2 с 10 минутами и т. Д. on.

Я могу сделать это, поддерживая состояние со списком раз в KeyedProcessFunction, registerProcessingTimeTimer с таймаутом, настроенным для ключа.

Но вместо того, чтобы управлять состоянием и элементами, как мне это сделать, используя window? Я попробовал с кастомным Trigger, но не смог четко заставить его работать.

Ответы [ 2 ]

0 голосов
/ 21 января 2020

Я публикую ниже исправление кода для моего пользовательского подхода Trigger. Я не использовал Evictor, как предложено @David, но использовал FIRE_AND_PURGE в onProcessingTime. Этот подход, кажется, работает, протестировав несколько сценариев ios.

Конфигурация слота

private static final Map<String, Long> eventSlots = new HashMap<>();

static {
    eventSlots.put("type22", 30000l);
    eventSlots.put("type12", 15000l);
    eventSlots.put("type9", 10000l);
    eventSlots.put("type2", 20000l);
    eventSlots.put("type7", 13000l);
}

обработка кода

SingleOutputStreamOperator<Event> sourceStream = ...

someSourceStream
.keyBy("type")
.window(GlobalWindows.create())
.trigger(new Trigger<Event, GlobalWindow>() {

 @Override
 public TriggerResult onElement(Event element, long timestamp, GlobalWindow window,
         TriggerContext ctx) throws Exception {
     ctx.registerProcessingTimeTimer(ctx.getCurrentProcessingTime() + eventSlots.get(element.getType()));
     return TriggerResult.CONTINUE;
 }

 @Override
 public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx)
         throws Exception {
     return TriggerResult.FIRE_AND_PURGE;
 }

 @Override
 public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx)
         throws Exception {
     return TriggerResult.CONTINUE;
 }

 @Override
 public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {
     ctx.deleteProcessingTimeTimer(window.maxTimestamp());
 }

}).process(new ProcessWindowFunction<Event, Events, Tuple, Context> () {
     @Override
     public void process(Tuple arg0,
         ProcessWindowFunction<Event, Events, Tuple, GlobalWindow>.Context ctx,
         Iterable<Event> arg2, Collector<Event> arg3) throws Exception {
       List<Event> events = new ArrayList<>();
       arg2.forEach(events::add);
       arg3.collect(new Events(events));
});
0 голосов
/ 21 января 2020

Я думаю, что вы можете сделать это с помощью Global Windows вместе с пользовательским триггером (и, возможно, пользовательским Evictor). Альтернативой может быть использование пользовательского оконного присваивателя.

В целом, я не рекомендую углубляться в API пользовательского окна, потому что получающиеся в результате решения трудно понять.

...