Это вызвано функцией pattern.within? [апач-флинк] - PullRequest
0 голосов
/ 21 июня 2019

C: 4, g: 3, h: 1, L: 4 данные поступают каждые 2 секунды. ч: 1 это значение, которое не имеет значения для меня. важные значения: C: 4, g: 3 и l: 4. У вас есть пять секунд 'pattern.within', когда начинается процесс. Поэтому я должен увидеть C: 3, g: 4, следующее значение должно быть L: 4. Код работает без сбоев при работе на intelj. Моя проблема заключается в ошибке 'filter function' при конвертации в jar и запуске в flink-1.3.2. Мой код

data_list_remove_duplicate = ['C:4','g:3','l:4'];
Pattern<String,?> pattern = Pattern.<String>begin("start").where(
    new SimpleCondition<String>() {
      @Override
      public boolean filter(String value) throws Exception {
         flag = false;
         patern_start_time = new Date();
         if (patern_end_time.before(patern_start_time)){
           patern_end_time = DateUtils.addSeconds(patern_start_time, addMinuteTime); //add seconds
           found_list.clear();
         }
         if(!data_list_remove_duplicate.contains(value)){
             if(!found_list.contains(value))
               found_list.add(value);
               if(control_count == 2){
                  control_count =1;
                  flag = false;
               }
               else
                 flag = true;
              }
              return flag;
           }
       } ).followedBy("middle").optional().within(Time.seconds(addMinuteTime)).where(
        new SimpleCondition<String>() {
           @Override
           public boolean filter(String middle) throws Exception {
             flag = false;
             if(!data_list_remove_duplicate.contains(middle) && !found_list.contains(middle)){
                found_list.add(middle);
                flag = true;
              }
              return flag;
           }
        }
  ).followedBy("end").optional().within(Time.seconds(addMinuteTime)).where(
        new SimpleCondition<String>() {
           @Override
           public boolean filter(String end) throws Exception {
              flag = false;
              if(!data_list_remove_duplicate.contains(end) && !found_list.contains(end)){
                   found_list.add(end);
                   flag = true;
               }
               return flag;
             }
          }
        );
        patern_start_time = new Date();
        pattern.within(Time.seconds(addMinuteTime));
        patern_end_time = DateUtils.addSeconds(patern_start_time, addMinuteTime); //add minute
        System.out.println("current date and time: " + patern_start_time);
        System.out.println("current date and time: " + patern_end_time);
        PatternStream<String> patternStream = CEP.pattern(stream_itemset_categorized,pattern);

        DataStream<String> result = patternStream.select(
                new PatternSelectFunction<String, String>() {
                    @Override
                    public String select(Map<String, List<String>> map) throws Exception {
                        control_count = 2;
                        Integer found = 0;
                        String found_anomaly = "";
                        for (String alert_count: found_list){
                            String[] count = alert_count.split(":");
                            found_anomaly += alert_count + ",";
                            found += Integer.parseInt(count[1]);
                        }
                        if (found_list.size() == 3)
                            found_list.clear();
                        return found_anomaly;
                    }
                }
        );
        result.print();
    }

но, если я удалю эту часть кода, остальная часть будет работать нормально. Эта часть, pattern.within, должна очистить found_list при запуске функции.

patern_start_time = new Date();
                        if (patern_end_time.before(patern_start_time)){
                            patern_end_time = DateUtils.addSeconds(patern_start_time, addMinuteTime); //add seconds
                            found_list.clear();
                        }

возможно ли что-то вызвать с помощью функции 'pattern.within'? Можете ли вы предложить другой метод?

...