Как сделать расчеты / сравнения на 2 окнах? - PullRequest
0 голосов
/ 04 января 2019

Я собираю события с разными идентификаторами, во входящих событиях есть n типов фиксированных идентификаторов. Я хочу собрать среднее количество прошлых событий, основываясь на временных рамках или нет. событий между разными типами идентификаторов. Допустим, есть 2 устройства, отправляющих данные / событие с идентификаторами «a» и «b». Я хочу получить среднее значение за последние 5 минут данных для обоих устройств, а затем сравнить оба средних значения, чтобы принять какое-то решение.

С помощью этого кода я собираю данные за последние n минут и сохраняю их в 2 окнах. `

@source(type='http', receiver.url='http://localhost:5007/SweetProductionEP', @map(type = 'json'))
define stream InProduction(name string, amount int);


define window hold_a(avg_amount double) length(1);

define window hold_b(avg_amount double) length(1);


from InProduction[name=='a']#window.timeBatch(5 min)
select avg(amount) as avg_amount
group by name
insert into hold_a;

from InProduction[name=='b']#window.timeBatch(5 min)
select avg(amount) as avg_amount
group by name
insert into hold_b;`

Окно hold_a и hold_b получат средние данные за последние 5 минут. Теперь я хочу сравнить данные из обоих окон и принять решение.

Я пытался соединиться в обоих окнах, но запрос на соединение не выполняется.

1 Ответ

0 голосов
/ 05 января 2019

Вы должны использовать шаблон для достижения этой цели.Ниже запрос с выводом имени, которое имело самое высокое среднее значение в самое высокое AvgStream.

@source(type='http', receiver.url='http://localhost:5007/SweetProductionEP', @map(type = 'json'))
define stream InProduction(name string, amount int);

from InProduction[name=='a']#window.timeBatch(5 min)
select avg(amount) as avg_amount, name
insert into avgStream;

from InProduction[name=='b']#window.timeBatch(5 min)
select avg(amount) as avg_amount, name
insert into avgStream;`

from every(e1=avgStream -> e2=avgStream)
select ifthenelse(e1.avg_amount>e2.avg_amount,e1.name,e2.name) as highestAvgName
insert into HighestAvgStream;
...