Исключение при выполнении siddhiQL - PullRequest
0 голосов
/ 17 февраля 2020

Я выполняю правило сиддхи и получаю следующее исключение -

java.lang.IllegalStateException: MarkIn consecutively called without calling markOut in org.wso2.siddhi.SiddhiApps.Bucket_#0_Query_#0.Siddhi.Queries.query_9_98e3c67d-e1c2-4923-97a8-bfe2862b114d.latency
    at org.wso2.siddhi.core.util.statistics.metrics.SiddhiLatencyMetric.markIn(SiddhiLatencyMetric.java:55)
    at org.wso2.siddhi.core.query.input.MultiProcessStreamReceiver.process(MultiProcessStreamReceiver.java:78)
    at org.wso2.siddhi.core.query.input.MultiProcessStreamReceiver.receive(MultiProcessStreamReceiver.java:104)
    at org.wso2.siddhi.core.query.input.stream.state.receiver.PatternMultiProcessStreamReceiver.receive(PatternMultiProcessStreamReceiver.java:60)
    at org.wso2.siddhi.core.stream.StreamJunction.sendEvent(StreamJunction.java:129)
    at org.wso2.siddhi.core.stream.StreamJunction$Publisher.send(StreamJunction.java:353)
    at org.wso2.siddhi.core.query.output.callback.InsertIntoStreamCallback.send(InsertIntoStreamCallback.java:56)
    at org.wso2.siddhi.core.query.output.ratelimit.OutputRateLimiter.sendToCallBacks(OutputRateLimiter.java:88)
    at org.wso2.siddhi.core.query.output.ratelimit.PassThroughOutputRateLimiter.process(PassThroughOutputRateLimiter.java:46)
    at org.wso2.siddhi.core.query.selector.QuerySelector.process(QuerySelector.java:97)
    at org.wso2.siddhi.core.query.processor.filter.FilterProcessor.process(FilterProcessor.java:60)
    at org.wso2.siddhi.core.query.input.ProcessStreamReceiver.processAndClear(ProcessStreamReceiver.java:187)
    at org.wso2.siddhi.core.query.input.ProcessStreamReceiver.process(ProcessStreamReceiver.java:97)
    at org.wso2.siddhi.core.query.input.ProcessStreamReceiver.receive(ProcessStreamReceiver.java:122)
    at org.wso2.siddhi.core.stream.StreamJunction.sendEvent(StreamJunction.java:129)
    at org.wso2.siddhi.core.stream.StreamJunction$Publisher.send(StreamJunction.java:353)
    at org.wso2.siddhi.core.query.output.callback.InsertIntoStreamCallback.send(InsertIntoStreamCallback.java:56)
    at org.wso2.siddhi.core.query.output.ratelimit.OutputRateLimiter.sendToCallBacks(OutputRateLimiter.java:88)
    at org.wso2.siddhi.core.query.output.ratelimit.PassThroughOutputRateLimiter.process(PassThroughOutputRateLimiter.java:46)
    at org.wso2.siddhi.core.query.selector.QuerySelector.process(QuerySelector.java:97)
    at org.wso2.siddhi.core.query.input.StateMultiProcessStreamReceiver.processAndClear(StateMultiProcessStreamReceiver.java:69)
    at org.wso2.siddhi.core.query.input.MultiProcessStreamReceiver.process(MultiProcessStreamReceiver.java:79)
    at org.wso2.siddhi.core.query.input.MultiProcessStreamReceiver.receive(MultiProcessStreamReceiver.java:104)
    at org.wso2.siddhi.core.query.input.stream.state.receiver.PatternMultiProcessStreamReceiver.receive(PatternMultiProcessStreamReceiver.java:60)
    at org.wso2.siddhi.core.stream.StreamJunction.sendEvent(StreamJunction.java:129)
    at org.wso2.siddhi.core.stream.StreamJunction$Publisher.send(StreamJunction.java:353)
    at org.wso2.siddhi.core.query.output.callback.InsertIntoStreamCallback.send(InsertIntoStreamCallback.java:56)
    at org.wso2.siddhi.core.query.output.ratelimit.OutputRateLimiter.sendToCallBacks(OutputRateLimiter.java:88)
    at org.wso2.siddhi.core.query.output.ratelimit.PassThroughOutputRateLimiter.process(PassThroughOutputRateLimiter.java:46)
    at org.wso2.siddhi.core.query.selector.QuerySelector.process(QuerySelector.java:97)
    at org.wso2.siddhi.core.query.processor.filter.FilterProcessor.process(FilterProcessor.java:60)
    at org.wso2.siddhi.core.query.input.ProcessStreamReceiver.processAndClear(ProcessStreamReceiver.java:187)
    at org.wso2.siddhi.core.query.input.ProcessStreamReceiver.process(ProcessStreamReceiver.java:97)
    at org.wso2.siddhi.core.query.input.ProcessStreamReceiver.receive(ProcessStreamReceiver.java:122)
    at org.wso2.siddhi.core.stream.StreamJunction.sendEvent(StreamJunction.java:129)
    at org.wso2.siddhi.core.stream.StreamJunction$Publisher.send(StreamJunction.java:353)
    at org.wso2.siddhi.core.query.output.callback.InsertIntoStreamCallback.send(InsertIntoStreamCallback.java:56)
    at org.wso2.siddhi.core.query.output.ratelimit.OutputRateLimiter.sendToCallBacks(OutputRateLimiter.java:88)
    at org.wso2.siddhi.core.query.output.ratelimit.PassThroughOutputRateLimiter.process(PassThroughOutputRateLimiter.java:46)
    at org.wso2.siddhi.core.query.selector.QuerySelector.process(QuerySelector.java:97)
    at org.wso2.siddhi.core.query.processor.filter.FilterProcessor.process(FilterProcessor.java:60)
    at org.wso2.siddhi.core.query.input.ProcessStreamReceiver.processAndClear(ProcessStreamReceiver.java:187)
    at org.wso2.siddhi.core.query.input.ProcessStreamReceiver.process(ProcessStreamReceiver.java:97)
    at org.wso2.siddhi.core.query.input.ProcessStreamReceiver.receive(ProcessStreamReceiver.java:133)
    at org.wso2.siddhi.core.stream.StreamJunction.sendEvent(StreamJunction.java:151)
    at org.wso2.siddhi.core.stream.StreamJunction$Publisher.send(StreamJunction.java:358)
    at org.wso2.siddhi.core.stream.input.InputDistributor.send(InputDistributor.java:34)
    at org.wso2.siddhi.core.stream.input.InputEntryValve.send(InputEntryValve.java:44)
    at org.wso2.siddhi.core.stream.input.InputHandler.send(InputHandler.java:61)
    at net.continuum.samza.siddhi.model.Query.send(Query.java:60)
    at net.continuum.samza.siddhi.routing.router.SmartRouter.sendToQueries(SmartRouter.java:37)
    at net.continuum.samza.siddhi.model.Bucket.consume(Bucket.java:56)
    at net.continuum.samza.siddhi.model.Bucket.consume(Bucket.java:52)
    at net.continuum.samza.siddhi.model.EventMapper.process(EventMapper.java:30)
    at net.continuum.samza.siddhi.model.Rule.consume(Rule.java:40)
    at net.continuum.samza.siddhi.model.EventMapper.process(EventMapper.java:30)
    at net.continuum.samza.siddhi.model.Group.sendEvent(Group.java:89)
    at net.continuum.samza.task.FlatMessageProcessor.sendEvent(FlatMessageProcessor.java:39)
    at net.continuum.samza.task.FlatMessageProcessor.process(FlatMessageProcessor.java:31)
    at net.continuum.samza.task.SiddhiMainStreamTask.process(SiddhiMainStreamTask.java:98)
    at org.apache.samza.task.AsyncStreamTaskAdapter.process(AsyncStreamTaskAdapter.java:72)
    at org.apache.samza.task.AsyncStreamTaskAdapter.processAsync(AsyncStreamTaskAdapter.java:63)
    at org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(TaskInstance.scala:171)
    at org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54)
    at org.apache.samza.container.TaskInstance.process(TaskInstance.scala:169)
    at org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.process(AsyncRunLoop.java:428)
    at org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.run(AsyncRunLoop.java:373)
    at org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.access$300(AsyncRunLoop.java:314)
    at org.apache.samza.task.AsyncRunLoop.runTasks(AsyncRunLoop.java:228)
    at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:157)
    at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:665)
    at org.apache.samza.runtime.LocalContainerRunner.run(LocalContainerRunner.java:100)
    at org.apache.samza.runtime.LocalContainerRunner.main(LocalContainerRunner.java:145)

Следующее правило, что я выполняю

define stream RegulatorStream(roomNo int, deviceId long)
define stream tempStream (value int);
define stream tempStream1 (value int);
define stream tempStream2 (value int);

@sink(type='log')
define stream output(msg string);

from RegulatorStream[roomNo != 24]
select 0 as value
insert into tempStream;

from RegulatorStream[roomNo != 25]
select 1 as value
insert into tempStream;

from RegulatorStream[roomNo != 26]
select 1 as value
insert into tempStream1;

from RegulatorStream[roomNo != 27]
select 1 as value
insert into tempStream2;

from every e1 = tempStream -> e2 = tempStream[(e1.value == 0 and e2.value == 1) or (e1.value == 1 and e2.value == 0) ] within 15 mins
select 0 as value
insert into tempStream1;

from every e1 = tempStream1-> e2 = tempStream1[(e1.value == 0 and e2.value == 1) or (e1.value == 1 and e2.value == 0) ] within 15 mins
select 0 as value
insert into tempStream2;

from every e1 = tempStream2-> e2 = tempStream2[(e1.value == 0 and e2.value == 1) or (e1.value == 1 and e2.value == 0) ] within 15 mins
select "alert" as msg
insert into outputStream;

По сути, это просто демонстрация того, что я пытаюсь сделать. Я знаю, я могу достичь этого в одном запросе, но фактический случай, который я пытаюсь обработать, сложен и должен обрабатываться только таким способом. Сообщение, которое я отправляю здесь, содержит roomNo как 29, и в этом конкретном случае я ожидаю оповещения в виде msg в моем outputStream, но каждый раз, когда я получаю выше исключения

...