Я пытаюсь сделать простой CEP с esper. У меня есть некоторые фиксированные данные потока, представленные в виде текстового файла, моя цель состоит в том, чтобы дать какой-либо шаблон для вывода для каждого события в потоке, во сколько совпадений оно было найдено.
Все шаблоны будут определяться некоторой фиксированной длиной окна, что означает, что они будут чем-то похожи - найдите каждый A и каждый B, следующий за ними без C, которые находятся на расстоянии не более 6 событий друг от друга ( A и B не обязательно являются смежными) (конечно, шаблоны могут быть более сложными). Чтобы смоделировать это, используя механизм времени esper, я присвоил каждому событию атрибут count - первое событие count = 1, второе - count = 2 и т. Д., И после отправки каждого события я увеличиваю время на 1 se c (см. код). Счет события также служит его идентификатором при подсчете количества совпадений, в котором оно появилось.
Я немного расстроился из-за структуры esper после нескольких попыток, которые не сработали. Я попытался использовать шаблон вида «каждый A -> каждый B, где таймер: в пределах (X se c)», но я обнаружил, что при использовании этого объекта newData, который я получаю в функции слушателя, кажется, ухудшает результаты вместо того, чтобы просто держать новый результат каждый раз, когда вызывается функция. Кроме того, при использовании такого рода паттернов через некоторое время бег становится настолько медленным, что в конечном итоге практически останавливается. Я также попытался, используя синтаксис -
select * from Event
match_recognize (
measures A as X, B as Y
pattern (A B)
define
*something*
)
Но я понимаю, что он соответствует только соседним событиям A, B, что не является моим намерением. Есть ли какой-нибудь простой способ сделать какой-нибудь фиксированный запрос длины окна с помощью esper? Должен ли я попробовать другой фреймворк? Я прилагаю весь код и приветствую любую помощь.
public class Main {
static int count = 0;
static Map<Integer, Integer> eventToOccurrences = new HashMap<>();
static String statementName = "mystatement";
static int toPrint = 1000;
static int eventsNum = 9999999;
public static void main(String[] _s) throws IOException {
EPCompiler compiler = EPCompilerProvider.getCompiler();
Configuration configuration = new Configuration();
configuration.getCommon().addEventType(Event.class);
CompilerArguments args = new CompilerArguments(configuration);
EPCompiled epCompiled;
try {
final var pattern =
"select * from pattern [every e1=Event(type='A') -> every e2=Event(type='B') where timer:within(5 sec)];";
epCompiled = compiler.compile("@name('" + statementName + "') " + pattern, args);
}
catch (EPCompileException ex) {
// handle exception here
throw new RuntimeException(ex);
}
EPRuntime runtime = EPRuntimeProvider.getDefaultRuntime(configuration);
EPDeployment deployment;
try {
deployment = runtime.getDeploymentService().deploy(epCompiled);
}
catch (EPDeployException ex) {
// handle exception here
throw new RuntimeException(ex);
}
EPStatement statement =
runtime.getDeploymentService().getStatement(deployment.getDeploymentId(), statementName);
statement.addListener((newData, oldData, s, r) -> {
final var events = ((MapEventBean) newData[0]).getProperties();
for (var b : events.values()) {
BeanEventBean event = (BeanEventBean)b;
int count = (int) event.get("count");
int occurrences = eventToOccurrences.getOrDefault(count, 0);
eventToOccurrences.put(count, occurrences + 1);
}
});
sendEvents(runtime);
var fileName = "output.txt";
FileWriter fileWriter = new FileWriter(fileName);
PrintWriter printWriter = new PrintWriter(fileWriter);
for (int count = 0; count < eventsNum; count++) {
int occurrences = eventToOccurrences.getOrDefault(count, 0);
printWriter.println(Integer.toString(occurrences));
}
printWriter.close();
}
static void sendEvents(EPRuntime runtime){
BufferedReader reader;
try {
reader = new BufferedReader(new FileReader("synthetic_FOR_TRAIN_values.txt"));
String line = reader.readLine();
while (line != null) {
String[] s = line.split(",");
final var event = new Event(s[0], Double.parseDouble(s[1]), count);
runtime.getEventService().sendEventBean(event,"Event");
// read next line
line = reader.readLine();
count += 1;
runtime.getEventService().advanceTime(count*1000);
if(count % toPrint == 0){
System.out.println(Integer.toString(count));
}
}
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public class Event {
private String type;
private double value;
private int count;
public Event(String type, double value, int count) {
this.type = type;
this.value = value;
this.count = count;
}
public String getType() {
return type;
}
public int getCount() {
return count;
}
public double getValue() {
return value;
}
}