У меня есть настройка checkpointing
в моей работе Flink, и у нее есть 2 скользящих окна (это не соединения) и 1 соединение с переворачивающимися окнами.Идея заключается в том, что мне не нужно сохранять состояние для самого join
, так как достаточно сохранить состояние для самих 2
раздвижных окон.Join
в конечном итоге находится в состоянии 20-30 ГБ, что приводит к задержке и аварийному завершению задания, а контрольная точка никогда не заканчивается сохранением.
Как я могу это сделать?
Я пытаюсь что-то вроде:
public class CustomJoin implements JoinFunction<A, A, A>, ListCheckPointed<A> {
@Override
public A join(A a, A b){
// Some irrelevant join logic
}
@Override
public List<A> snapshotState(long l, long l1) throws Exception {
return new ArrayList<>();
}
@Override
public void restoreState(List<A> list) throws Exception {
}
}
Действительно ли это позволяет избежать сохранения состояния для соединения?Это называется как:
stream
.assignTimestampsAndWatermarks(...)
.join(secondStream.assingTimestampsAndWatermarks(...))
.where(KeySelector...)
.equalTo(KeySelector...)
.window(TumblingEventTimeWindows.of(Time.minutes(1L))
.trigger(EventTimeTrigger.create())
.apply(new CustomJoin());
Это работает на практике?Каков наилучший способ избежать сохранения состояния?