Я сталкиваюсь с непредвиденным поведением при использовании предложения output every
вместе с предложением таблицы join
.
- У меня есть базовое приложение c с одним потоком
input
и 2 таблицы, в которых хранится другой список значений. Затем есть также 2 запроса: - Первый
query1
объединится с table1
, а при совпадении будет output first every 5 sec
. - Секунда
query2
будет действовать аналогично, присоединится к table2
и будет выводить первое найденное значение каждые 5 сек c. - Цель этого - каждые 5 секунд, когда в потоке
input
есть значение, которое содержится в таблице 1, будет совпадение, а если в таблице 2 есть значение, будет другое совпадение, и оба запроса будут молчать до следующих 5 секунд block.
приложение выглядит следующим образом:
@App:name("delays_tables_join")
define stream input(value string);
define stream table_input(value string);
define table table1(value string);
define table table2(value string);
@sink(type='log')
define stream LogStream (value string);
-- fill table1
@info(name='insert table 1')
from table_input[value == '1']
insert into table1;
-- fill table2
@info(name='insert table 2')
from table_input[value == '2']
insert into table1;
-- query input join with table 1, output once every 5 sec
@info(name='query1')
from input join table1 on input.value == table1.value
select input.value
output first every 5 sec
insert into LogStream;
-- query input join with table 2, output once every 5 sec
@info(name='query2')
from input join table2 on input.value == table2.value
select input.value
output first every 5 sec
insert into LogStream;
- При запуске этого приложения сначала его отправляют на
table_input
значения 1
и 2
для заполнения обеих таблиц И затем он начинает многократно отправлять во входной поток значения: 1
, 2
, 1
, 2
, 1
, 2
...
Ожидается, что он будет иметь значения LogStream
2 каждые 5 секунд, первое появление значения 1
и первое появление значения 2
.
- Но вместо этого все время появляется только первое вхождение значения
1
, но не значение 2
[2020-04-02_18-55-16_498] INFO {io.siddhi.core.stream.output.sink.LogSink} - delays_tables_join : LogStream : Event{timestamp=1585846516098, data=[1], isExpired=false}
[2020-04-02_18-55-21_508] INFO {io.siddhi.core.stream.output.sink.LogSink} - delays_tables_join : LogStream : Event{timestamp=1585846521098, data=[1], isExpired=false}
Обратите внимание, что при отсутствии объединения таблиц оба запроса работают должным образом. Пример без объединений:
@App:name("delays")
define stream Input(value string);
@sink(type='log')
define stream LogStream (value string);
@info(name='query1')
from Input[value == '1']
select value
output first every 5 sec
insert into LogStream;
@info(name='query2')
from Input[value == '2']
select value
output first every 5 sec
insert into LogStream;
это даст следующий вывод:
[2020-04-02_18-53-50_305] INFO {io.siddhi.core.stream.output.sink.LogSink} - delays : LogStream : Event{timestamp=1585846430304, data=[1], isExpired=false}
[2020-04-02_18-53-50_706] INFO {io.siddhi.core.stream.output.sink.LogSink} - delays : LogStream : Event{timestamp=1585846430305, data=[2], isExpired=false}
[2020-04-02_18-53-55_312] INFO {io.siddhi.core.stream.output.sink.LogSink} - delays : LogStream : Event{timestamp=1585846438305, data=[1], isExpired=false}
[2020-04-02_18-53-56_114] INFO {io.siddhi.core.stream.output.sink.LogSink} - delays : LogStream : Event{timestamp=1585846439305, data=[2], isExpired=false}
.
Мне было интересно, ожидается ли такое поведение, или там В дизайне приложения есть какие-либо ошибки.
Большое спасибо!