Как поставить отметку времени прибытия и приема пищи в сиддхи CEP - PullRequest
0 голосов
/ 07 мая 2018

Я хочу поставить метку времени прибытия для потока, как только он прибудет на сервер IoT WSO2, и метку времени проглатывания, когда он используется механизмом CEP. Эти времена будут использоваться для вычисления Queuing latency и CEP latency следующим образом.

Queuing latency = ingestion time - arrival time
CEP latency = detection time - ingestion time

Ниже мой план выполнения

@Plan:name('Server_CEP')

@Plan:statistics('true')

@Plan:trace('true')

@plan:async(bufferSize='1024')


@Import('stream2_scep:1.0.0')
define stream eeg_stream (meta_sensorID_s2 int, meta_tupleID_s2 int, value_s2 int, generationTime_s2 long);

@Import('stream1_scep:1.0.0')
define stream ecg_stream (meta_sensorID_s1 int, meta_tupleID_s1 int, value_s1 int, generationTime_s1 long);

@Export('cep_stream_scep:1.0.0')
define stream CEPStream (cep_event int, cepLatency long);


from every ecg = ecg_stream[value_s1 >= 50 ] ->  eeg = eeg_stream[value_s2 >= 50] within 10 sec

 select  ecg.value_s1 as  cep_event ,  convert(time:currentTimestamp(), 'long')  - ecg.generationTime_s1  as cepLatency

 insert into CEPStream;

Я могу найти время обнаружения как текущее время, когда обнаружено событие CEP. Я также использую @async с размером буфера 1024. Теперь вопрос заключается в том, как мне пометить время прибытия потока, как только он прибудет. Кроме того, вторая проблема заключается в том, как поставить метку времени проглатывания двигателя.

Может кто-нибудь сказать мне, как мне этого добиться?

PS: я смог добиться этого устройства Android, так как использовал неблокирующую очередь, и время прибытия было временем его поступления в очередь FIFO, а время приема - временем его удаления из очереди

Ответы [ 2 ]

0 голосов
/ 27 июня 2018

Я сделал это, создав execution plan, который получает поток x, ставит отметку времени и отправляет его в другой поток y.Пример кода такого плана выполнения:

@Plan:name('scep_s1_arrival_timestamping')

@Plan:statistics('false')

@Plan:trace('false')

@Import('stream1_scep:1.0.0')
define stream inputStream (meta_sensorID_s1 int, meta_tupleID_s1 int, value_s1 int, generationTime_s1 long);

@Export('stream1_scep:2.0.0')
define stream outputStream (meta_sensorID_s1 int, meta_tupleID_s1 int, value_s1 int, generationTime_s1 long, arrivalTime_s1 long);

from  inputStream
select meta_sensorID_s1 as meta_sensorID_s1 , meta_tupleID_s1 as meta_tupleID_s1,  value_s1 as value_s1,  generationTime_s1 as generationTime_s1, convert(time:timestampInMilliseconds (),'long') as arrivalTime_s1
insert into outputStream;

. Отметка времени выполняется с использованием convert(time:timestampInMilliseconds (),'long') as arrivalTime_s1.Обратите внимание, что скрытое используется для преобразования типа данных в long, который затем вставляется в переменную arrivalTime_s1.

0 голосов
/ 09 мая 2018

Желательно не использовать @plan:async(bufferSize='1024'), так как он будет применим ко всем потокам, связанным с приложением Siddhi. Следовательно, применяйте async(buffer.size = '1024') только к тем потокам, которые вы хотите сделать асинхронными.

например.

async(buffer.size = '1024')
define stream <stream name> (...);

Теперь, чтобы достичь того, что вы просили. Отправьте исходный поток make не асинхронно (синхронно), используйте этот поток в запросе и вставьте текущую временную метку в это событие, затем отправьте результат в другой поток, настроенный в асинхронном режиме, и, наконец, используйте второй поток для отдыха. обработки. Таким образом, вы также сможете добавлять время прибытия к событиям синхронизированным способом.

...