Я выполняю правило сиддхи и получаю следующее исключение -
java.lang.IllegalStateException: MarkIn consecutively called without calling markOut in org.wso2.siddhi.SiddhiApps.Bucket_#0_Query_#0.Siddhi.Queries.query_9_98e3c67d-e1c2-4923-97a8-bfe2862b114d.latency
at org.wso2.siddhi.core.util.statistics.metrics.SiddhiLatencyMetric.markIn(SiddhiLatencyMetric.java:55)
at org.wso2.siddhi.core.query.input.MultiProcessStreamReceiver.process(MultiProcessStreamReceiver.java:78)
at org.wso2.siddhi.core.query.input.MultiProcessStreamReceiver.receive(MultiProcessStreamReceiver.java:104)
at org.wso2.siddhi.core.query.input.stream.state.receiver.PatternMultiProcessStreamReceiver.receive(PatternMultiProcessStreamReceiver.java:60)
at org.wso2.siddhi.core.stream.StreamJunction.sendEvent(StreamJunction.java:129)
at org.wso2.siddhi.core.stream.StreamJunction$Publisher.send(StreamJunction.java:353)
at org.wso2.siddhi.core.query.output.callback.InsertIntoStreamCallback.send(InsertIntoStreamCallback.java:56)
at org.wso2.siddhi.core.query.output.ratelimit.OutputRateLimiter.sendToCallBacks(OutputRateLimiter.java:88)
at org.wso2.siddhi.core.query.output.ratelimit.PassThroughOutputRateLimiter.process(PassThroughOutputRateLimiter.java:46)
at org.wso2.siddhi.core.query.selector.QuerySelector.process(QuerySelector.java:97)
at org.wso2.siddhi.core.query.processor.filter.FilterProcessor.process(FilterProcessor.java:60)
at org.wso2.siddhi.core.query.input.ProcessStreamReceiver.processAndClear(ProcessStreamReceiver.java:187)
at org.wso2.siddhi.core.query.input.ProcessStreamReceiver.process(ProcessStreamReceiver.java:97)
at org.wso2.siddhi.core.query.input.ProcessStreamReceiver.receive(ProcessStreamReceiver.java:122)
at org.wso2.siddhi.core.stream.StreamJunction.sendEvent(StreamJunction.java:129)
at org.wso2.siddhi.core.stream.StreamJunction$Publisher.send(StreamJunction.java:353)
at org.wso2.siddhi.core.query.output.callback.InsertIntoStreamCallback.send(InsertIntoStreamCallback.java:56)
at org.wso2.siddhi.core.query.output.ratelimit.OutputRateLimiter.sendToCallBacks(OutputRateLimiter.java:88)
at org.wso2.siddhi.core.query.output.ratelimit.PassThroughOutputRateLimiter.process(PassThroughOutputRateLimiter.java:46)
at org.wso2.siddhi.core.query.selector.QuerySelector.process(QuerySelector.java:97)
at org.wso2.siddhi.core.query.input.StateMultiProcessStreamReceiver.processAndClear(StateMultiProcessStreamReceiver.java:69)
at org.wso2.siddhi.core.query.input.MultiProcessStreamReceiver.process(MultiProcessStreamReceiver.java:79)
at org.wso2.siddhi.core.query.input.MultiProcessStreamReceiver.receive(MultiProcessStreamReceiver.java:104)
at org.wso2.siddhi.core.query.input.stream.state.receiver.PatternMultiProcessStreamReceiver.receive(PatternMultiProcessStreamReceiver.java:60)
at org.wso2.siddhi.core.stream.StreamJunction.sendEvent(StreamJunction.java:129)
at org.wso2.siddhi.core.stream.StreamJunction$Publisher.send(StreamJunction.java:353)
at org.wso2.siddhi.core.query.output.callback.InsertIntoStreamCallback.send(InsertIntoStreamCallback.java:56)
at org.wso2.siddhi.core.query.output.ratelimit.OutputRateLimiter.sendToCallBacks(OutputRateLimiter.java:88)
at org.wso2.siddhi.core.query.output.ratelimit.PassThroughOutputRateLimiter.process(PassThroughOutputRateLimiter.java:46)
at org.wso2.siddhi.core.query.selector.QuerySelector.process(QuerySelector.java:97)
at org.wso2.siddhi.core.query.processor.filter.FilterProcessor.process(FilterProcessor.java:60)
at org.wso2.siddhi.core.query.input.ProcessStreamReceiver.processAndClear(ProcessStreamReceiver.java:187)
at org.wso2.siddhi.core.query.input.ProcessStreamReceiver.process(ProcessStreamReceiver.java:97)
at org.wso2.siddhi.core.query.input.ProcessStreamReceiver.receive(ProcessStreamReceiver.java:122)
at org.wso2.siddhi.core.stream.StreamJunction.sendEvent(StreamJunction.java:129)
at org.wso2.siddhi.core.stream.StreamJunction$Publisher.send(StreamJunction.java:353)
at org.wso2.siddhi.core.query.output.callback.InsertIntoStreamCallback.send(InsertIntoStreamCallback.java:56)
at org.wso2.siddhi.core.query.output.ratelimit.OutputRateLimiter.sendToCallBacks(OutputRateLimiter.java:88)
at org.wso2.siddhi.core.query.output.ratelimit.PassThroughOutputRateLimiter.process(PassThroughOutputRateLimiter.java:46)
at org.wso2.siddhi.core.query.selector.QuerySelector.process(QuerySelector.java:97)
at org.wso2.siddhi.core.query.processor.filter.FilterProcessor.process(FilterProcessor.java:60)
at org.wso2.siddhi.core.query.input.ProcessStreamReceiver.processAndClear(ProcessStreamReceiver.java:187)
at org.wso2.siddhi.core.query.input.ProcessStreamReceiver.process(ProcessStreamReceiver.java:97)
at org.wso2.siddhi.core.query.input.ProcessStreamReceiver.receive(ProcessStreamReceiver.java:133)
at org.wso2.siddhi.core.stream.StreamJunction.sendEvent(StreamJunction.java:151)
at org.wso2.siddhi.core.stream.StreamJunction$Publisher.send(StreamJunction.java:358)
at org.wso2.siddhi.core.stream.input.InputDistributor.send(InputDistributor.java:34)
at org.wso2.siddhi.core.stream.input.InputEntryValve.send(InputEntryValve.java:44)
at org.wso2.siddhi.core.stream.input.InputHandler.send(InputHandler.java:61)
at net.continuum.samza.siddhi.model.Query.send(Query.java:60)
at net.continuum.samza.siddhi.routing.router.SmartRouter.sendToQueries(SmartRouter.java:37)
at net.continuum.samza.siddhi.model.Bucket.consume(Bucket.java:56)
at net.continuum.samza.siddhi.model.Bucket.consume(Bucket.java:52)
at net.continuum.samza.siddhi.model.EventMapper.process(EventMapper.java:30)
at net.continuum.samza.siddhi.model.Rule.consume(Rule.java:40)
at net.continuum.samza.siddhi.model.EventMapper.process(EventMapper.java:30)
at net.continuum.samza.siddhi.model.Group.sendEvent(Group.java:89)
at net.continuum.samza.task.FlatMessageProcessor.sendEvent(FlatMessageProcessor.java:39)
at net.continuum.samza.task.FlatMessageProcessor.process(FlatMessageProcessor.java:31)
at net.continuum.samza.task.SiddhiMainStreamTask.process(SiddhiMainStreamTask.java:98)
at org.apache.samza.task.AsyncStreamTaskAdapter.process(AsyncStreamTaskAdapter.java:72)
at org.apache.samza.task.AsyncStreamTaskAdapter.processAsync(AsyncStreamTaskAdapter.java:63)
at org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(TaskInstance.scala:171)
at org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54)
at org.apache.samza.container.TaskInstance.process(TaskInstance.scala:169)
at org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.process(AsyncRunLoop.java:428)
at org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.run(AsyncRunLoop.java:373)
at org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.access$300(AsyncRunLoop.java:314)
at org.apache.samza.task.AsyncRunLoop.runTasks(AsyncRunLoop.java:228)
at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:157)
at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:665)
at org.apache.samza.runtime.LocalContainerRunner.run(LocalContainerRunner.java:100)
at org.apache.samza.runtime.LocalContainerRunner.main(LocalContainerRunner.java:145)
Следующее правило, что я выполняю
define stream RegulatorStream(roomNo int, deviceId long)
define stream tempStream (value int);
define stream tempStream1 (value int);
define stream tempStream2 (value int);
@sink(type='log')
define stream output(msg string);
from RegulatorStream[roomNo != 24]
select 0 as value
insert into tempStream;
from RegulatorStream[roomNo != 25]
select 1 as value
insert into tempStream;
from RegulatorStream[roomNo != 26]
select 1 as value
insert into tempStream1;
from RegulatorStream[roomNo != 27]
select 1 as value
insert into tempStream2;
from every e1 = tempStream -> e2 = tempStream[(e1.value == 0 and e2.value == 1) or (e1.value == 1 and e2.value == 0) ] within 15 mins
select 0 as value
insert into tempStream1;
from every e1 = tempStream1-> e2 = tempStream1[(e1.value == 0 and e2.value == 1) or (e1.value == 1 and e2.value == 0) ] within 15 mins
select 0 as value
insert into tempStream2;
from every e1 = tempStream2-> e2 = tempStream2[(e1.value == 0 and e2.value == 1) or (e1.value == 1 and e2.value == 0) ] within 15 mins
select "alert" as msg
insert into outputStream;
По сути, это просто демонстрация того, что я пытаюсь сделать. Я знаю, я могу достичь этого в одном запросе, но фактический случай, который я пытаюсь обработать, сложен и должен обрабатываться только таким способом. Сообщение, которое я отправляю здесь, содержит roomNo как 29, и в этом конкретном случае я ожидаю оповещения в виде msg в моем outputStream, но каждый раз, когда я получаю выше исключения