Я немного не понимаю, как работают триггеры Flink. Мой поток данных содержит события с sessionId, которые я агрегировал на основе этого sessionId. Каждый сеанс будет содержать события Started и Ended, однако иногда событие Ended будет потеряно.
Чтобы справиться с этим, я настроил триггер, который будет генерировать агрегированный сеанс всякий раз, когда обрабатывается завершенное событие . Но в случае, если в течение 2 минут из этого сеанса не поступает никаких событий, я хочу выдать все, что мы собрали на данный момент (наши приложения, которые отправляют события, отправляют пульс каждую минуту, поэтому, если мы не получаем никаких событий, сеанс считается потерянным) .
Я установил следующую функцию триггера:
public class EventTimeProcessingTimeTrigger extends Trigger<HashMap, TimeWindow> {
private final long sessionTimeout;
private long lastSetTimer;
// Max session length set to 1 day
public static final long MAX_SESSION_LENGTH = 1000l * 86400l;
// End session events
private static ImmutableSet<String> endSession = ImmutableSet.<String>builder()
.add("Playback.Aborted")
.add("Playback.Completed")
.add("Playback.Error")
.add("Playback.StartAirplay")
.add("Playback.StartCasting")
.build();
public EventTimeProcessingTimeTrigger(long sessionTimeout) {
this.sessionTimeout = sessionTimeout;
}
@Override
public TriggerResult onElement(HashMap element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
lastSetTimer = ctx.getCurrentProcessingTime() + sessionTimeout;
ctx.registerProcessingTimeTimer(lastSetTimer);
if(endSession.contains(element.get(Field.EVENT_TYPE))) {
return TriggerResult.FIRE_AND_PURGE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return time == window.maxTimestamp() ?
TriggerResult.FIRE_AND_PURGE :
TriggerResult.CONTINUE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.deleteProcessingTimeTimer(lastSetTimer);
}
@Override
public boolean canMerge() {
return true;
}
@Override
public void onMerge(TimeWindow window,
OnMergeContext ctx) {
ctx.registerProcessingTimeTimer(ctx.getCurrentProcessingTime() + sessionTimeout);
}
}
Чтобы установить водяные знаки для событий, я использую водяные знаки, установленные приложениями, поскольку appEventTime может не совпадать с wallClock на сервере. Я извлекаю водяные знаки следующим образом:
DataStream<HashMap> playerEvents = env
.addSource(kafkaConsumerEvents, "playerEvents(Kafka)")
.name("Read player events from Kafka")
.uid("Read player events from Kafka")
.map(json -> DECODER.decode(json, TypeToken.of(HashMap.class))).returns(HashMap.class)
.name("Map Json to HashMap")
.uid("Map Json to HashMap")
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<HashMap>(org.apache.flink.streaming.api.windowing.time.Time.seconds(30))
{
@Override
public long extractTimestamp(HashMap element)
{
long timestamp = 0L;
Object timestampAsObject = (Object) element.get("CanonicalTime");
timestamp = (long)timestampAsObject;
return timestamp;
}
})
.name("Add CanonicalTime as timestamp")
.uid("Add CanonicalTime as timestamp");
Теперь я нахожу странным то, что когда я запускаю код в отладке и устанавливаю точку останова в функции очистки триггера, он постоянно вызывается. Даже если в триггере не достигнута точка FIRE_AND_PURGE. Похоже, я совершенно не понял, как должен работать триггер. И что моя реализация вовсе не делает то, что я думаю.
Думаю, мой вопрос в том, когда триггер должен вызывать clear? И правильно ли это реализовать комбинированный EventTimeTrigger и ProcessingTimeTrigger?
Благодарен за всю помощь, которую я могу получить. )
Чтобы предоставить дополнительную информацию о том, как все настроено. Я настроил свою среду следующим образом:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.failureRateRestart(60, Time.of(60, TimeUnit.MINUTES), Time.of(60, TimeUnit.SECONDS)));
env.enableCheckpointing(5000);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Поэтому я использую EventTime для всего потока. Затем я создаю windows следующим образом:
DataStream<PlayerSession> playerSessions = sideEvents
.keyBy((KeySelector<HashMap, String>) event -> (String) event.get(Field.SESSION_ID))
.window(ProcessingTimeSessionWindows.withGap(org.apache.flink.streaming.api.windowing.time.Time.minutes(5)))
.trigger(new EventTimeProcessingTimeTrigger(SESSION_TIMEOUT))
.aggregate(new SessionAggregator())
.name("Aggregate events into sessions")
.uid("Aggregate events into sessions");