Правильный способ дедупликации потока заключается в разделении потока по ключу, так что все элементы, содержащие один и тот же ключ, будут обрабатываться одним и тем же работником, и с использованием механизма управляемого состояния ключа по ключу, чтобы состояние было ошибочнымтолерантный и масштабируемыйВот пример реализации:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new EventSource())
.keyBy(e -> e.key)
.flatMap(new Deduplicate())
.print();
env.execute();
}
public static class Deduplicate extends RichFlatMapFunction<Event, Event> {
ValueState<Boolean> seen;
@Override
public void open(Configuration conf) {
ValueStateDescriptor<Boolean> desc = new ValueStateDescriptor<>("seen", Types.BOOLEAN);
seen = getRuntimeContext().getState(desc);
}
@Override
public void flatMap(Event event, Collector<Event> out) throws Exception {
if (seen.value() == null) {
out.collect(event);
seen.update(true);
}
}
}
Это также может быть реализовано как RichFilterFunction, кстати.Но учтите, что если у вас неограниченное пространство клавиш, используемое состояние будет расти бесконечно, пока у вас не закончится куча или пространство на диске, в зависимости от того, какой из бэкэндов состояний Флинка вы выберете.Если это проблема, вы можете настроить политику хранения состояний через State Time-to-Live .
Обратите внимание, что совместное использование состояния между различными частями конвейера Flink невозможно.Вам нужно вывернуть вещи наизнанку по сравнению с тем, что может показаться нормальным, и привести поток событий в состояние, а не извлекать его.