Фильтрация уникальных событий в Apache Flink - PullRequest
0 голосов
/ 05 июля 2019

Я определяю определенные переменные в одном Java-классе и обращаюсь к нему с помощью другого класса, чтобы отфильтровать поток для уникальных элементов. Пожалуйста, используйте код, чтобы лучше понять проблему.

Проблема, с которой я сталкиваюсь, заключается в том, что эта функция фильтра не работает должным образом и не может фильтровать уникальные события. Я сомневаюсь, что переменная является общей для разных потоков, и это является причиной !? Пожалуйста, предложите другой метод, если это не правильный способ сделать это. Заранее спасибо.

**ClassWithVariables.java**
public static HashMap<String, ArrayList<String>> uniqueMap = new HashMap<>();


**FilterClass.java**
public boolean filter(String val) throws Exception {

       if(ClassWithVariables.uniqueMap.containsKey(key)) {

                Arraylist<String> al = uniqueMap.get(key);

                if(al.contains(val) {
                    return false;
                } else {
                    //Update the hashmap list(uniqueMap)                    
                    return true;    
                }


       } else {

               //Add to hashmap list(uniqueMap)
               return true;
       }

}

1 Ответ

0 голосов
/ 05 июля 2019

Правильный способ дедупликации потока заключается в разделении потока по ключу, так что все элементы, содержащие один и тот же ключ, будут обрабатываться одним и тем же работником, и с использованием механизма управляемого состояния ключа по ключу, чтобы состояние было ошибочнымтолерантный и масштабируемыйВот пример реализации:

public static void main(String[] args) throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

  env.addSource(new EventSource())
    .keyBy(e -> e.key)
    .flatMap(new Deduplicate())
    .print();

  env.execute();
}

public static class Deduplicate extends RichFlatMapFunction<Event, Event> {
  ValueState<Boolean> seen;

  @Override
  public void open(Configuration conf) {
    ValueStateDescriptor<Boolean> desc = new ValueStateDescriptor<>("seen", Types.BOOLEAN);
    seen = getRuntimeContext().getState(desc);
  }

  @Override
  public void flatMap(Event event, Collector<Event> out) throws Exception {
    if (seen.value() == null) {
      out.collect(event);
      seen.update(true);
    }
  }
}

Это также может быть реализовано как RichFilterFunction, кстати.Но учтите, что если у вас неограниченное пространство клавиш, используемое состояние будет расти бесконечно, пока у вас не закончится куча или пространство на диске, в зависимости от того, какой из бэкэндов состояний Флинка вы выберете.Если это проблема, вы можете настроить политику хранения состояний через State Time-to-Live .

Обратите внимание, что совместное использование состояния между различными частями конвейера Flink невозможно.Вам нужно вывернуть вещи наизнанку по сравнению с тем, что может показаться нормальным, и привести поток событий в состояние, а не извлекать его.

...