C: 4, g: 3, h: 1, L: 4 данные поступают каждые 2 секунды. ч: 1 это значение, которое не имеет значения для меня. важные значения: C: 4, g: 3 и l: 4. У вас есть пять секунд 'pattern.within', когда начинается процесс. Поэтому я должен увидеть C: 3, g: 4, следующее значение должно быть L: 4. Код работает без сбоев при работе на intelj. Моя проблема заключается в ошибке 'filter function' при конвертации в jar и запуске в flink-1.3.2. Мой код
data_list_remove_duplicate = ['C:4','g:3','l:4'];
Pattern<String,?> pattern = Pattern.<String>begin("start").where(
new SimpleCondition<String>() {
@Override
public boolean filter(String value) throws Exception {
flag = false;
patern_start_time = new Date();
if (patern_end_time.before(patern_start_time)){
patern_end_time = DateUtils.addSeconds(patern_start_time, addMinuteTime); //add seconds
found_list.clear();
}
if(!data_list_remove_duplicate.contains(value)){
if(!found_list.contains(value))
found_list.add(value);
if(control_count == 2){
control_count =1;
flag = false;
}
else
flag = true;
}
return flag;
}
} ).followedBy("middle").optional().within(Time.seconds(addMinuteTime)).where(
new SimpleCondition<String>() {
@Override
public boolean filter(String middle) throws Exception {
flag = false;
if(!data_list_remove_duplicate.contains(middle) && !found_list.contains(middle)){
found_list.add(middle);
flag = true;
}
return flag;
}
}
).followedBy("end").optional().within(Time.seconds(addMinuteTime)).where(
new SimpleCondition<String>() {
@Override
public boolean filter(String end) throws Exception {
flag = false;
if(!data_list_remove_duplicate.contains(end) && !found_list.contains(end)){
found_list.add(end);
flag = true;
}
return flag;
}
}
);
patern_start_time = new Date();
pattern.within(Time.seconds(addMinuteTime));
patern_end_time = DateUtils.addSeconds(patern_start_time, addMinuteTime); //add minute
System.out.println("current date and time: " + patern_start_time);
System.out.println("current date and time: " + patern_end_time);
PatternStream<String> patternStream = CEP.pattern(stream_itemset_categorized,pattern);
DataStream<String> result = patternStream.select(
new PatternSelectFunction<String, String>() {
@Override
public String select(Map<String, List<String>> map) throws Exception {
control_count = 2;
Integer found = 0;
String found_anomaly = "";
for (String alert_count: found_list){
String[] count = alert_count.split(":");
found_anomaly += alert_count + ",";
found += Integer.parseInt(count[1]);
}
if (found_list.size() == 3)
found_list.clear();
return found_anomaly;
}
}
);
result.print();
}
но, если я удалю эту часть кода, остальная часть будет работать нормально. Эта часть, pattern.within, должна очистить found_list при запуске функции.
patern_start_time = new Date();
if (patern_end_time.before(patern_start_time)){
patern_end_time = DateUtils.addSeconds(patern_start_time, addMinuteTime); //add seconds
found_list.clear();
}
возможно ли что-то вызвать с помощью функции 'pattern.within'? Можете ли вы предложить другой метод?