Как реализовать триггер времени события Flink, который излучается после отсутствия событий, полученных в течение X минут - PullRequest
1 голос
/ 28 мая 2020

Я немного не понимаю, как работают триггеры Flink. Мой поток данных содержит события с sessionId, которые я агрегировал на основе этого sessionId. Каждый сеанс будет содержать события Started и Ended, однако иногда событие Ended будет потеряно.

Чтобы справиться с этим, я настроил триггер, который будет генерировать агрегированный сеанс всякий раз, когда обрабатывается завершенное событие . Но в случае, если в течение 2 минут из этого сеанса не поступает никаких событий, я хочу выдать все, что мы собрали на данный момент (наши приложения, которые отправляют события, отправляют пульс каждую минуту, поэтому, если мы не получаем никаких событий, сеанс считается потерянным) .

Я установил следующую функцию триггера:

public class EventTimeProcessingTimeTrigger extends Trigger<HashMap, TimeWindow> {
    private final long sessionTimeout;
    private long lastSetTimer;

    // Max session length set to 1 day
    public static final long MAX_SESSION_LENGTH = 1000l * 86400l;

    // End session events
    private static ImmutableSet<String> endSession = ImmutableSet.<String>builder()
            .add("Playback.Aborted")
            .add("Playback.Completed")
            .add("Playback.Error")
            .add("Playback.StartAirplay")
            .add("Playback.StartCasting")
            .build();

    public EventTimeProcessingTimeTrigger(long sessionTimeout) {
        this.sessionTimeout = sessionTimeout;
    }

    @Override
    public TriggerResult onElement(HashMap element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
        lastSetTimer = ctx.getCurrentProcessingTime() + sessionTimeout;
        ctx.registerProcessingTimeTimer(lastSetTimer);

        if(endSession.contains(element.get(Field.EVENT_TYPE))) {
            return TriggerResult.FIRE_AND_PURGE;
        }

        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.FIRE_AND_PURGE;
    }

    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return time == window.maxTimestamp() ?
                TriggerResult.FIRE_AND_PURGE :
                TriggerResult.CONTINUE;
    }

    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.deleteProcessingTimeTimer(lastSetTimer);
    }

    @Override
    public boolean canMerge() {
        return true;
    }

    @Override
    public void onMerge(TimeWindow window,
                        OnMergeContext ctx) {
        ctx.registerProcessingTimeTimer(ctx.getCurrentProcessingTime() + sessionTimeout);
    }
}

Чтобы установить водяные знаки для событий, я использую водяные знаки, установленные приложениями, поскольку appEventTime может не совпадать с wallClock на сервере. Я извлекаю водяные знаки следующим образом:

DataStream<HashMap> playerEvents = env
                .addSource(kafkaConsumerEvents, "playerEvents(Kafka)")
                .name("Read player events from Kafka")
                .uid("Read player events from Kafka")
                .map(json -> DECODER.decode(json, TypeToken.of(HashMap.class))).returns(HashMap.class)
                .name("Map Json to HashMap")
                .uid("Map Json to HashMap")
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<HashMap>(org.apache.flink.streaming.api.windowing.time.Time.seconds(30))
                {
                    @Override
                    public long extractTimestamp(HashMap element)
                    {
                        long timestamp = 0L;
                        Object timestampAsObject = (Object) element.get("CanonicalTime");
                        timestamp = (long)timestampAsObject;
                        return timestamp;
                    }
                })
                .name("Add CanonicalTime as timestamp")
                .uid("Add CanonicalTime as timestamp");

Теперь я нахожу странным то, что когда я запускаю код в отладке и устанавливаю точку останова в функции очистки триггера, он постоянно вызывается. Даже если в триггере не достигнута точка FIRE_AND_PURGE. Похоже, я совершенно не понял, как должен работать триггер. И что моя реализация вовсе не делает то, что я думаю.

Думаю, мой вопрос в том, когда триггер должен вызывать clear? И правильно ли это реализовать комбинированный EventTimeTrigger и ProcessingTimeTrigger?

Благодарен за всю помощь, которую я могу получить. )

Чтобы предоставить дополнительную информацию о том, как все настроено. Я настроил свою среду следующим образом:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRestartStrategy(RestartStrategies.failureRateRestart(60, Time.of(60, TimeUnit.MINUTES), Time.of(60, TimeUnit.SECONDS)));
        env.enableCheckpointing(5000);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

Поэтому я использую EventTime для всего потока. Затем я создаю windows следующим образом:

DataStream<PlayerSession> playerSessions = sideEvents
                .keyBy((KeySelector<HashMap, String>) event -> (String) event.get(Field.SESSION_ID))
                .window(ProcessingTimeSessionWindows.withGap(org.apache.flink.streaming.api.windowing.time.Time.minutes(5)))
                .trigger(new EventTimeProcessingTimeTrigger(SESSION_TIMEOUT))
                .aggregate(new SessionAggregator())
                .name("Aggregate events into sessions")
                .uid("Aggregate events into sessions");

1 Ответ

1 голос
/ 30 мая 2020

Это сложная ситуация. Я не решаюсь точно предсказать, что будет делать этот код, но я могу объяснить некоторые из того, что происходит. реализовал обратный вызов onEventTime в вашем триггере. Но нигде вы не создаете таймер времени события. Если я что-то не пропустил, на самом деле ничто не использует время события или водяные знаки. Вы не реализовали триггер по времени события, и я не ожидал, что когда-либо будет вызван onEventTime.

Пункт 2: вашему триггеру не нужно вызывать clear. Flink заботится о вызове очистки триггеров в рамках очистки windows.

Пункт 3: ваш триггер пытается несколько раз запустить и очистить окно, что не кажется правильным. Я говорю это, потому что вы создаете новый таймер времени обработки для каждого элемента, и когда каждый таймер срабатывает, вы запускаете и очищаете окно. Вы можете запускать окно так часто, как захотите, но вы можете очистить окно только один раз, после чего оно исчезнет.

Пункт 4: Сессия windows - это особый вид окна, известный как слияние windows. Когда сеансы объединяются (что происходит постоянно по мере поступления событий), их триггеры объединяются, и один из них очищается. Вот почему вы так часто видите вызов clear.

Предложение: поскольку у вас есть сообщения поддержки активности раз в минуту и ​​вы собираетесь закрывать сеансы после 2 минут бездействия, похоже, что вы можете установить интервал сеанса на составлять 2 минуты, и это позволит избежать изрядной части того, что делает вещи такими сложными. Позвольте сеансу windows делать то, для чего они предназначены.

Предполагая, что это сработает, вы можете просто расширить Flink ProcessingTimeTrigger и переопределить его метод onElement, чтобы сделать это:

@Override
public TriggerResult onElement(HashMap element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {

    if (endSession.contains(element.get(Field.EVENT_TYPE))) {
        return TriggerResult.FIRE_AND_PURGE;
    }

    return super(element, timestamp, window, ctx);
}

Таким образом, окно будет запущено после двух минут бездействия или явным событием завершения сеанса.

Вы должны иметь возможность просто унаследовать остальную часть поведения ProcessingTimeTrigger .

Если вы хотите использовать время события, используйте EventTimeTrigger в качестве суперкласса, и вам нужно будет найти способ убедиться, что ваши водяные знаки работают, даже когда поток становится бездействующим. См. этот ответ , чтобы узнать, как с этим справиться.

...