аггригация данных сиддхи, основанная на времени - PullRequest
0 голосов
/ 11 марта 2019

Я пытаюсь объединить данные датчиков на основе временных окон и записать их в Cassandra, как только они достигнут 30-секундного окна (свертывание).

Например, датчик с именем «temp» отправляет 3 показания в течение 30 секунд.Мне нравится получать среднее значение для этого датчика за последние 30 секунд и записывать среднее значение в Cassandra, когда окно завершается.

Это мой код

BasicConfigurator.configure();


        // Create Siddhi Application
        String siddhiApp = "define stream SensorEventStream (sensorid string, value double); " +
                " " +
                "@info(name = 'query1') " +
                "from SensorEventStream#window.time(30 sec)  " +
                "select sensorid, avg(value) as value " +
                "group by sensorid " +
                "insert into AggregateSensorEventStream ;";

        // Creating Siddhi Manager
        SiddhiManager siddhiManager = new SiddhiManager();

        //Generating runtime
        SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp);

        //Adding callback to retrieve output events from query
        siddhiAppRuntime.addCallback("AggregateSensorEventStream", new StreamCallback() {


            @Override
            public void receive(org.wso2.siddhi.core.event.Event[] events) {
                 EventPrinter.print(events);
            }
        });

        //Retrieving input handler to push events into Siddhi
        InputHandler inputHandler = siddhiAppRuntime.getInputHandler("SensorEventStream");

        //Starting event processing
        siddhiAppRuntime.start();

        //Sending events to Siddhi
        inputHandler.send(new Object[]{"Temp", 26d});
        Thread.sleep(1000);
        inputHandler.send(new Object[]{"Temp", 25d});
        Thread.sleep(1000);
        inputHandler.send(new Object[]{"Temp", 24d});
        Thread.sleep(60000);
        inputHandler.send(new Object[]{"Temp", 23d});

        //Shutting down the runtime
        siddhiAppRuntime.shutdown();

        //Shutting down Siddhi
        siddhiManager.shutdown();

И вывод такой:

0 [main] INFO org.wso2.siddhi.core.util.EventPrinter  - [Event{timestamp=1552281656960, data=[Temp, 26.0], isExpired=false}]
1002 [main] INFO org.wso2.siddhi.core.util.EventPrinter  - [Event{timestamp=1552281657971, data=[Temp, 25.5], isExpired=false}]
2003 [main] INFO org.wso2.siddhi.core.util.EventPrinter  - [Event{timestamp=1552281658972, data=[Temp, 25.0], isExpired=false}]
62004 [main] INFO org.wso2.siddhi.core.util.EventPrinter  - [Event{timestamp=1552281718972, data=[Temp, 23.0], isExpired=false}]

Из этого демонстрационного кода я вижу, что он отправляет первое среднее значение temp для 3 событийсразу и через 30 секунд окно ничего не делает.затем печатает 23.

Как я могу получить уведомление, когда окно закроется через 30 секунд?Я думал, что это то, что делает функция приема.

Я не уверен, неправильно ли я здесь понял.Это возможно с Сидди вообще?

1 Ответ

0 голосов
/ 11 марта 2019

Это ожидаемое поведение, окно является скользящим окном. Здесь, когда приходит первое событие, 1-я секунда, окно содержит только первое событие, поэтому среднее значение равно 26. Затем, когда приходит второе событие, окно имеет как 26d, так и 25d, тогда среднее значение составляет 25,5. Аналогично, 3-я секунда в среднем 25d. Затем через 31, 32 и 33 секунды эти события истекают из окна. Поэтому, когда приходит ваше 4-е событие (63-я секунда), в окне присутствует только последнее событие, поэтому среднее значение будет само значением. Это окно вычисляет среднее значение, как только происходит событие, в зависимости от событий, полученных за последние 30 секунд до него.

От вашего вопроса вы, кажется, хотите окно timeBatch. Здесь среднее значение рассчитывается только в конце партии. Например, в этом случае 30-я, 60-я, 90-я секунда и так далее. Пожалуйста, смотрите timeBatch документацию для образцов.

...