Я собираю события с разными идентификаторами, во входящих событиях есть n типов фиксированных идентификаторов. Я хочу собрать среднее количество прошлых событий, основываясь на временных рамках или нет. событий между разными типами идентификаторов.
Допустим, есть 2 устройства, отправляющих данные / событие с идентификаторами «a» и «b». Я хочу получить среднее значение за последние 5 минут данных для обоих устройств, а затем сравнить оба средних значения, чтобы принять какое-то решение.
С помощью этого кода я собираю данные за последние n минут и сохраняю их в 2 окнах.
`
@source(type='http', receiver.url='http://localhost:5007/SweetProductionEP', @map(type = 'json'))
define stream InProduction(name string, amount int);
define window hold_a(avg_amount double) length(1);
define window hold_b(avg_amount double) length(1);
from InProduction[name=='a']#window.timeBatch(5 min)
select avg(amount) as avg_amount
group by name
insert into hold_a;
from InProduction[name=='b']#window.timeBatch(5 min)
select avg(amount) as avg_amount
group by name
insert into hold_b;`
Окно hold_a и hold_b получат средние данные за последние 5 минут. Теперь я хочу сравнить данные из обоих окон и принять решение.
Я пытался соединиться в обоих окнах, но запрос на соединение не выполняется.