Простой CEP-запрос с помощью esper framework - PullRequest
0 голосов
/ 02 марта 2020

Я пытаюсь сделать простой CEP с esper. У меня есть некоторые фиксированные данные потока, представленные в виде текстового файла, моя цель состоит в том, чтобы дать какой-либо шаблон для вывода для каждого события в потоке, во сколько совпадений оно было найдено.

Все шаблоны будут определяться некоторой фиксированной длиной окна, что означает, что они будут чем-то похожи - найдите каждый A и каждый B, следующий за ними без C, которые находятся на расстоянии не более 6 событий друг от друга ( A и B не обязательно являются смежными) (конечно, шаблоны могут быть более сложными). Чтобы смоделировать это, используя механизм времени esper, я присвоил каждому событию атрибут count - первое событие count = 1, второе - count = 2 и т. Д., И после отправки каждого события я увеличиваю время на 1 se c (см. код). Счет события также служит его идентификатором при подсчете количества совпадений, в котором оно появилось.

Я немного расстроился из-за структуры esper после нескольких попыток, которые не сработали. Я попытался использовать шаблон вида «каждый A -> каждый B, где таймер: в пределах (X se c)», но я обнаружил, что при использовании этого объекта newData, который я получаю в функции слушателя, кажется, ухудшает результаты вместо того, чтобы просто держать новый результат каждый раз, когда вызывается функция. Кроме того, при использовании такого рода паттернов через некоторое время бег становится настолько медленным, что в конечном итоге практически останавливается. Я также попытался, используя синтаксис -

select * from Event 
match_recognize (
   measures A as X, B as Y
   pattern (A B)
   define 
     *something*
)

Но я понимаю, что он соответствует только соседним событиям A, B, что не является моим намерением. Есть ли какой-нибудь простой способ сделать какой-нибудь фиксированный запрос длины окна с помощью esper? Должен ли я попробовать другой фреймворк? Я прилагаю весь код и приветствую любую помощь.

public class Main {
    static int count = 0;
    static Map<Integer, Integer> eventToOccurrences = new HashMap<>();
    static String statementName = "mystatement";
    static int toPrint = 1000;
    static int eventsNum = 9999999;

    public static void main(String[] _s) throws IOException {
        EPCompiler compiler = EPCompilerProvider.getCompiler();
        Configuration configuration = new Configuration();
        configuration.getCommon().addEventType(Event.class);
        CompilerArguments args = new CompilerArguments(configuration);

        EPCompiled epCompiled;
        try {
            final var pattern = 
                 "select * from pattern [every e1=Event(type='A') -> every e2=Event(type='B') where timer:within(5 sec)];";
            epCompiled = compiler.compile("@name('" + statementName + "') " + pattern, args);
        }
        catch (EPCompileException ex) {
            // handle exception here
            throw new RuntimeException(ex);
        }

        EPRuntime runtime = EPRuntimeProvider.getDefaultRuntime(configuration);
        EPDeployment deployment;
        try {
            deployment = runtime.getDeploymentService().deploy(epCompiled);
        }
        catch (EPDeployException ex) {
            // handle exception here
            throw new RuntimeException(ex);
        }

        EPStatement statement =
                runtime.getDeploymentService().getStatement(deployment.getDeploymentId(), statementName);

        statement.addListener((newData, oldData, s, r) -> {
            final var events = ((MapEventBean) newData[0]).getProperties();
            for (var b : events.values()) {
               BeanEventBean event = (BeanEventBean)b;
               int count = (int) event.get("count");
               int occurrences = eventToOccurrences.getOrDefault(count, 0);
               eventToOccurrences.put(count, occurrences + 1);
            }
        });

        sendEvents(runtime);

        var fileName = "output.txt";
        FileWriter fileWriter = new FileWriter(fileName);
        PrintWriter printWriter = new PrintWriter(fileWriter);

        for (int count = 0; count < eventsNum; count++) {
            int occurrences = eventToOccurrences.getOrDefault(count, 0);
            printWriter.println(Integer.toString(occurrences));
        }
        printWriter.close();
    }

    static void sendEvents(EPRuntime runtime){
        BufferedReader reader;
        try {
            reader = new BufferedReader(new FileReader("synthetic_FOR_TRAIN_values.txt"));
            String line = reader.readLine();
            while (line != null) {
                String[] s = line.split(",");
                final var event = new Event(s[0], Double.parseDouble(s[1]), count);
                runtime.getEventService().sendEventBean(event,"Event");
                // read next line
                line = reader.readLine();
                count += 1;
                runtime.getEventService().advanceTime(count*1000);
                if(count % toPrint == 0){
                    System.out.println(Integer.toString(count));
                }
            }
            reader.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

public class Event {
    private String type;
    private double value;
    private int count;

    public Event(String type, double value, int count) {
        this.type = type;
        this.value = value;
        this.count = count;
    }

    public String getType() {
        return type;
    }
    public int getCount() {
        return count;
    }
    public double getValue() {
        return value;
    }
}

1 Ответ

0 голосов
/ 02 марта 2020

Здесь есть несколько вещей.

Слушатели получают несколько событий. Однако код прослушивателя, который вы опубликовали, просматривает только «newData [0]» и игнорирует все остальные выходные данные. Слушатель должен l oop по всем выходным событиям, чтобы вы видели все выходные данные, т.е. "for (EventBean event: newData)". В противном случае вы можете запутаться, если код печатает только первую выходную строку.

«каждый A -> каждый B» дает вам все возможные комбинации всех событий A, которые когда-либо происходили, и всех событий B, которые когда-либо происходили , Это означает, что после того, как поступили события 100 A, и когда наступает одно событие B, есть 100 выходных строк, каждая из которых имеет указанную комбинацию c AB. Это также исчерпает память, поскольку среда выполнения запомнит все события A, которые когда-либо происходили. Если вы исправите код слушателя, как я уже говорил, вы бы увидели это.

К сожалению, ваше сообщение фактически не содержит требования. Какой вариант использования? Каков ожидаемый вход и ожидаемый результат? Нет информации. Поэтому я даю несколько общих советов.

Типичный шаблон, когда порядок четко определен, (добавьте фильтр в B или добавьте шаблон защиты по своему вкусу):

select * from pattern [every a=A -> b=B]

Типичный соединение, которое удобно, не требуя упорядочения (добавьте предложение where или измените данные windows или сделайте внешнее соединение по своему вкусу):

select * from A#lastevent as a, B#lastevent as B 

Или измените распознавание совпадений, чтобы разрешить что-либо между:

match_recognize .... pattern (A anything* B)

Если «предшествующее» требование касается порядка поступления, то, например, «match_recognize .... pattern (A {0,6} B C)» или «pattern [A -> [0 : 6] B -> C] ". Если «предыдущее» требование касается сравнения временных меток и событий, поступающих не по порядку, используйте соединение «где A.after (B)» или подобное. Объединения не заботятся о порядке прибытия.

...